Skip to content
alex [dot] kramer [at] g_m_a_i_l [dot] com edited this page Aug 30, 2020 · 74 revisions

Run Jupyter server for remote access (LAN)

jupyter notebook --ip [LOCAL HOST IP GOES HERE] --port 8888
jupyter notebook --ip `iip` --port 8888

Virtual environments

Create and activate/deactivate virtual environment:

python3 -m venv /path/to/new/virtual/env # E.g. /git/repo/root/.venv/venv_name
source /git/repo/root/.venv/venv_name/bin/activate
deactivate

Freeze requirements:

pip3 freeze > requirements.txt

Install requirements:

pip3 install -r requirements.txt

Disable pylint warnings

Place a comment inside a block to disable a specific warning for that block

# pylint: disable=C0321

Place at the top of the file to disable for entire file

# pylint: disable=C
# pylint: disable=W

Flush/unbuffer print

# Single print statement
print(..., flush=True)

# Entire module
from functools import partial
print = partial(print, flush=True)

All python processes:

PYTHONUNBUFFERED=TRUE

Stop urllib3 from spewing

requests.packages.urllib3.disable_warnings()

Dump iPython history to file

%history -g -f /tmp/ipyhist

Command line app skeleton

import argparse

def main(
    string_positional_arg,
    int_positional_arg,
    no_arg_flag,
    flag_with_default,
    flag_without_default,
    list_positional_arg
):
    print('string_positional_arg={}\nint_positional_arg={}\nno_arg_flag={}\nflag_with_default={}\nflag_without_default={}\nlist_positional_arg={}'.format(string_positional_arg, int_positional_arg, no_arg_flag, flag_with_default, flag_without_default, list_positional_arg))

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Command description help text.')
    parser.add_argument('string_positional_arg', help='String positional arg help text')
    parser.add_argument('int_positional_arg', type=int, help='Int positional arg help text')
    parser.add_argument('-no_arg_flag', action='store_true', help='This flag doesn\'t take an argument')
    parser.add_argument('--flag_with_default', help='This flag has a default value', default='Flag #1 default value')
    parser.add_argument('--flag_without_default', help='This flag has no default value')
    parser.add_argument('list_positional_arg', nargs='*', help='List positional arg help text')
    args = parser.parse_args()

    exit(
        main(
            args.string_positional_arg,
            args.int_positional_arg,
            args.no_arg_flag,
            args.flag_with_default,
            args.flag_without_default,
            args.list_positional_arg
        )
    )

Unittest skeleton

import unittest

class Test(unittest.TestCase):
    def test_something(self):
        self.assertEqual(2+2,5)

if __name__ == '__main__':
    unittest.main()

Flask skeleton

'''
$.ajax({
    url: "http://127.0.0.1:81/hello",
    type: "POST",
    contentType: "application/json",
    data: JSON.stringify({'inputVar': 1}),
    success: function( data ) {
        alert( "success: " + data );
    }
});

curl -X POST http://127.0.0.1:81/hello -d '{"foo": "bar"}'
'''

from flask import Flask, request
from flask_cors import CORS

app = Flask(__name__)
CORS(app) # Necessary if you want to hit the API from browser-based javascript

@app.route('/hello', methods=['GET', 'POST'])
def helloWorld():
    print(f'GOT BODY: {request.get_data()}')
    return "🌎 Hello, cross-origin-world! 🌏"

if __name__ == '__main__':
    app.run(host='0.0.0.0', port= 81)

Parse timestamp

https://strftime.org/

from datetime import datetime
dt = datetime.strptime('28/Jul/1995:13:32:22.1234567 -0400', '%d/%b/%Y:%H:%M:%S.%f %z')

Invoke python3 pip module

pip3 install [module]
python3 -m [module]

# e.g.:
pip3 install pylint
python3 -m pylint somefile.py

Run script in debugger

python -m pdb script.py

Get original markdown from reddit post

Convert comment permalink to json:
https://www.reddit.com/r/geopolitics/comments/5bgwfj/culminating_analysis_of/
becomes
https://www.reddit.com/r/geopolitics/comments/5bgwfj.json

Retrieve, extract, convert:

# Get JSON from endpoint
import requests
response = requests.get("https://www.reddit.com/r/geopolitics/comments/5bgwfj.json")
json = response.json()

# Get selftext from JSON
selftext = json[0]['data']['children'][0]['data']['selftext']

# HTML decode selftext
import HTMLParser
selftext = HTMLParser.HTMLParser().unescape(selftext)

# Unescape line breaks
selftext = selftext.replace("\\n", "\n")

# Unescape unicode (unnecessary if retrieved via python because it's already a unicode string)
# selftext = unicode(selftext, 'unicode-escape')

print selftext

Format UNIX epoch timestamp

import datetime

epoch_time = 1500000000
date = datetime.datetime.utcfromtimestamp(epoch_time)

date.strftime("%b %d %Y %H:%M")
# Jul 14 2017 02:40

date.strftime("%Y%m%d%H%M%S")
# 20170714024000

Scrape using Selenium WebDriver with a proxy

import json
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.proxy import Proxy, ProxyType

options = webdriver.ChromeOptions()
options.add_argument('headless')
options.add_argument('window-size=1200x600')

capabilities = webdriver.DesiredCapabilities.CHROME

prox = Proxy()
prox.proxy_type = ProxyType.MANUAL
prox.http_proxy = "123.45.67.890:1234"
prox.socks_proxy = "123.45.67.890:1234"
prox.ssl_proxy = "123.45.67.890:1234"
prox.add_to_capabilities(caps)

driver = webdriver.Chrome(chrome_options=options)

##### Only do this if you get an error about wrong chromedriver version #####
# from webdriver_manager.chrome import ChromeDriverManager
# manager = ChromeDriverManager().install()
# driver = webdriver.Chrome(manager, chrome_options=options)
########################################################################

driver.set_window_size(1920, 1080)
driver.maximize_window()
driver.implicitly_wait(120)

try:
    driver.get('https://api.ipify.org?format=json')
    ipJson = driver.find_element_by_css_selector('pre').text
    ip = json.loads(ipJson)['ip']
    print("WebDriver requests configured to originate from " + ip)
except Exception as e:
    print(type(e).__name__ + " Exception: " + str(e))
    traceback.print_exc()
    print("=====> Current URL:\n" + driver.current_url)
    print("=====> Page source:\n" + driver.page_source)
    print("=====> HAR log:\n" + driver.get_log("har")) # Useful! Will print out console log messages and failed requests, etc.
    driver.save_screenshot('/tmp/webdriver.png')
    driver.close()
    driver.quit()

Filter csv file example

NOTE: SHOULD PROBABLY USE PANDAS FOR THIS
Filter on single column:

filteredLines = [line for line in open('/tmp/mp3.txt') if int(line.split(',')[0]) > 100000]
output = ''.join(filteredLines) # Already has a line break because it was never stripped
outFile = open('bigMp3.txt', 'w')
outFile.write(output)
outFile.close()

Filter on multiple columns:

rows = [line.rstrip('\n').split(',') for line in open('/tmp/mp3.txt')]
filteredLines = [','.join(row) for row in rows if int(row[0]) > 100000 and 'foo' in row[2]]
output = '\n'.join(filteredLines)
outFile = open('bigFooMp3.txt', 'w')
outFile.write(output)
outFile.close()

Filter dictionary

def filterDict(dictionary, keys):
    keyValuePairs = [(key, dictionary.get(key, None)) for key in keys]
    return dict(keyValuePairs)

Scrape Spotify API

Artists

#!/usr/bin/env python3

import time
import requests
import json

def pruneDict(dictionary, keys):
    keyValuePairs = [(key, dictionary.get(key, None)) for key in keys]
    return dict(keyValuePairs)

##########################################

url = "https://api.spotify.com/v1/me/following?type=artist&limit=50"

# Get auth token from website https://developer.spotify.com/console/get-following/
headers = {
    'Authorization': "Bearer AUTH_TOKEN_GOES_HERE",
    'Cache-Control': "no-cache"
}

artists = []
artistKeys = ["name", "genres", "followers", "popularity", "uri"]
count = 0

while True:
    count += 1
    response = requests.request("GET", url, headers=headers)
    responseJson = response.json()
    items = responseJson["artists"]["items"]
    artists.extend(items)

    try:
        url = responseJson["artists"]["next"]
        print(str(count) + ": " + url, flush=True)
    except Exception as err:
        print("BREAKING ON COUNT: #" + str(count) + ", " + str(err), flush=True)
        print("==========\n\n", flush=True)
        break

    time.sleep(1)

out = [pruneDict(artist, artistKeys) for artist in artists]
for artist in out:
    artist["followers"] = artist["followers"]["total"]

print(json.dumps(out))

IKEA curbside pickup availability monitor

Turn up volume on speakers and leave running

import json
import time
import traceback
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait

options = webdriver.ChromeOptions()
options.add_argument('headless')
options.add_argument('window-size=1920x1080')

capabilities = webdriver.DesiredCapabilities.CHROME

##### Do this if you get an error about wrong chromedriver version #####
from webdriver_manager.chrome import ChromeDriverManager
manager = ChromeDriverManager().install()
driver = webdriver.Chrome(manager, options=options)
########################################################################

driver.set_window_size(1920, 1080)
driver.maximize_window()
driver.implicitly_wait(3)
driver.delete_all_cookies()

spoon_url = 'https://www.ikea.com/us/en/p/dragon-spoon-stainless-steel-30091763/'
delivery_url = 'https://order.ikea.com/us/en/checkout/delivery/'

try:
    driver.get(spoon_url)
    time.sleep(5)
    add_button = driver.find_element_by_css_selector('[aria-label="Add to bag"]')
    add_button.click()
    time.sleep(5)

    attempt_count = 0

    while(True):
        attempt_count += 1
        print(f'Attempt: {attempt_count}')

        driver.get(delivery_url)
        time.sleep(5)
        zip_input = driver.find_element_by_css_selector('#zipcode')
        zip_input.click()
        zip_input.clear()
        zip_input.send_keys('10000')
        zip_submit_button = driver.find_element_by_css_selector('.zipin button')
        zip_submit_button.click()
        time.sleep(8)
        collect_button = driver.find_element_by_css_selector('.collect button.delivery__option')
        collect_button.click()
        time.sleep(1)
        store_selector = Select(driver.find_element_by_css_selector('[title="Select store where to collect your products"]'))
        store_selector.select_by_visible_text('NY, IKEA Brooklyn')
        next_button = driver.find_element_by_css_selector('.delivery__submit button')
        next_button.click()
        time.sleep(3)
        if 'Click & Collect is temporarily unavailable at this store' in driver.page_source:
            continue
        else:
            time.sleep(8)
            if 'My IKEA Order' in driver.page_source:
                for i in range(20):
                    print('🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨')
                    os.system('say "ALERT\! ALERT\! ALERT\! Ikea pickup available\! ALERT\! ALERT\! ALERT\!"')
                    time.sleep(5)
                break
            else:
                continue
except Exception as e:
    driver.save_screenshot('/tmp/webdriver_fail.png')
    print(f'🔥☠️🔥☠️🔥☠️🔥☠️🔥☠️🔥☠️🔥\n{type(e).__name__} Exception: {str(e)}')
    print('⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️ =====> STACK TRACE:')
    traceback.print_exc()
    print(f'⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️ =====> Current URL:\n{driver.current_url}')
    source_dump_path = '/tmp/webdriver_dump.html'
    print(f'⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️ =====> Dumping page source to: {source_dump_path}')
    with open(source_dump_path, 'w') as source_dump_file:
        source_dump_file.write(driver.page_source)
        source_dump_file.close()

driver.close()
driver.quit()

Find most profitable single trade (one buy, one sell) in stock price tick stream (with visualization)

# Stock tick stream
prices = [random.randint(0,100) for x in range(100)]

best_lo, best_hi = prices[0], prices[1]
best_lo_time, best_hi_time = 0, 1
new_lo, new_hi = best_lo, best_hi
new_lo_time, new_hi_time = best_lo_time, best_hi_time
best_profit = best_hi - best_lo

for time, price in enumerate(prices):
    out = "{:3d}".format(time) + " | "
    if price > new_hi and time != 0:
        new_hi, new_hi_time = price, time
        out += "H"
    else:
        out += "."

    if price < new_lo:
        new_lo, new_lo_time = price, time
        new_hi, new_hi_time = 0, time + 1
        out += "L"
    else:
        out += "."

    new_profit = new_hi - new_lo

    if new_profit > best_profit:
        best_profit = new_profit
        best_lo, best_lo_time = new_lo, new_lo_time
        best_hi, best_hi_time = new_hi, new_hi_time
        out += "P"
    else:
        out += "."

    print(out +" " + "*" * price)

print("==========")
print("buy@${}(t{}), sell@${}(t{}), profit: ${}".format(best_lo, best_lo_time, best_hi, best_hi_time, best_profit))

LINK FOR BWOB

http://192.168.0.60:8888/?token=dabf641130d9f1501944cc07bc1967fa17a27ac812a55751

Clone this wiki locally