Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Real-time News Data Stream via WebSocket Subscription #392

Merged
merged 36 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5cbc8d6
Added News Stream
IsaacTrevino Oct 20, 2023
2a66fd6
updated readme
IsaacTrevino Oct 20, 2023
35b298f
Remove NEWS_DATA_STREAM
IsaacTrevino Oct 20, 2023
1245a57
poetry pre-commit
IsaacTrevino Oct 20, 2023
78fa279
update readme
IsaacTrevino Oct 20, 2023
75eba6a
test_cast
IsaacTrevino Oct 21, 2023
bef4760
test_cast
IsaacTrevino Oct 21, 2023
48d50f2
test_cast
IsaacTrevino Oct 21, 2023
710ef11
test_cast
IsaacTrevino Oct 21, 2023
c06a16c
test_cast
IsaacTrevino Oct 21, 2023
b9bde00
test_cast
IsaacTrevino Oct 22, 2023
5308187
Merge pull request #1 from IsaacTrevino:test_cast
IsaacTrevino Oct 22, 2023
b4feec9
clean up test_case.py imports
IsaacTrevino Oct 22, 2023
2dc6313
Merge branch 'alpacahq:master' into master
IsaacTrevino Oct 22, 2023
8a8456d
fixed news model issues
IsaacTrevino Oct 22, 2023
8aba3d9
Merge branch 'master' of https://github.com/IsaacTrevino/alpaca-py
IsaacTrevino Oct 22, 2023
a0f3dad
news websocket.py _dispatch fix
IsaacTrevino Oct 22, 2023
3d0918c
websocket raw_data=false testing
IsaacTrevino Oct 23, 2023
6b0cb4b
fixed timestamp to datetime in news ws
IsaacTrevino Oct 23, 2023
c7c4286
remove any
IsaacTrevino Oct 23, 2023
a3e9396
revert NewsClient namechange
IsaacTrevino Oct 23, 2023
16eb391
Added TimeSeriesMixin to News model
IsaacTrevino Oct 24, 2023
4924843
set news.images default none
IsaacTrevino Oct 24, 2023
1c81fc9
Update README.md
IsaacTrevino Nov 12, 2023
aacc3f3
Merge branch 'alpacahq:master' into news-stream
IsaacTrevino Dec 25, 2023
9019273
resolved comments
IsaacTrevino Dec 27, 2023
a1b8af2
remove test timestamp
IsaacTrevino Dec 27, 2023
f46dc51
instatiate handler per symbol
IsaacTrevino Dec 27, 2023
17206fe
handler fix
IsaacTrevino Jan 29, 2024
8d307cd
Merge branch 'master' into news-stream
IsaacTrevino Jan 29, 2024
c9ed276
duplicate calls
IsaacTrevino Jan 29, 2024
1a73ed8
Merge branch 'alpacahq:master' into news-stream
IsaacTrevino Feb 3, 2024
953de59
Merge branch 'master' into news-stream
IsaacTrevino Apr 9, 2024
fa8d7a9
Merge branch 'master' into news-stream
hiohiohio Jul 1, 2024
28b57f6
adjust code to merge
hiohiohio Jul 1, 2024
104b736
fix adjusted code
hiohiohio Jul 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

## About <a name="about"></a>

Alpaca-py provides an interface for interacting with the API products Alpaca offers. These API products are provided as various REST, WebSocket and SSE endpoints that allow you to do everything from streaming market data to creating your own investment apps.
Alpaca-py provides an interface for interacting with the API products Alpaca offers. These API products are provided as various REST, WebSocket and SSE endpoints that allow you to do everything from streaming market data to creating your own investment apps.

Learn more about the API products Alpaca offers at https://alpaca.markets.

Expand Down Expand Up @@ -65,9 +65,9 @@ If you’ve used the previous python SDK alpaca-trade-api, there are a few key d
Alpaca-py lets you use Broker API to start building your investment apps! Learn more at the [Broker](https://docs.alpaca.markets/docs/broker-api) page.

### OOP Design <a name="oop-design"></a>
Alpaca-py uses a more OOP approach to submitting requests compared to the previous SDK. To submit a request, you will most likely need to create a request object containing the desired request data. Generally, there is a unique request model for each method.
Alpaca-py uses a more OOP approach to submitting requests compared to the previous SDK. To submit a request, you will most likely need to create a request object containing the desired request data. Generally, there is a unique request model for each method.

Some examples of request models corresponding to methods:
Some examples of request models corresponding to methods:

* ``GetOrdersRequest`` for ``TradingClient.get_orders()``
* ``CryptoLatestOrderbookRequest`` for ``CryptoHistoricalDataClient.get_crypto_latest_orderbook()``
Expand Down Expand Up @@ -122,10 +122,12 @@ Alpaca-py has a lot of client classes. There is a client for each API and even a

**Market Data API:** ``StockHistoricalDataClient``, ``CryptoHistoricalDataClient``, ``CryptoDataStream``, ``StockDataStream``

**News API:** ``NewsClient``, ``NewsDataStream``

## API Keys <a name="api-keys"></a>

### Trading and Market Data API <a name="trading-api-keys"></a>
In order to use Alpaca’s services you’ll need to sign up for an Alpaca account and retrieve your API keys. Signing up is completely free and takes only a few minutes. Sandbox environments are available to test out the API. To use the sandbox environment, you will need to provide sandbox/paper keys. API keys are passed into Alpaca-py through either ``TradingClient``, ``StockHistoricalDataClient``, ``CryptoHistoricalDataClient``, ``StockDataStream``, or ``CryptoDataStream``.
In order to use Alpaca’s services you’ll need to sign up for an Alpaca account and retrieve your API keys. Signing up is completely free and takes only a few minutes. Sandbox environments are available to test out the API. To use the sandbox environment, you will need to provide sandbox/paper keys. API keys are passed into Alpaca-py through either ``TradingClient``, ``StockHistoricalDataClient``, ``CryptoHistoricalDataClient``, ``StockDataStream``, ``CryptoDataStream``, ``NewsClient``, or ``NewsDataStream``.

### Broker API <a name="broker-api-keys"></a>
To use the Broker API, you will need to sign up for a broker account and retrieve your Broker API keys. The API keys can be found on the dashboard once you’ve logged in. Alpaca also provides a sandbox environment to test out Broker API. To use the sandbox mode, provide your sandbox keys. Once you have your keys, you can pass them into ``BrokerClient`` to get started.
Expand All @@ -137,6 +139,8 @@ To view full descriptions and examples view the [documentation page](https://doc

**Market Data API**: Access live and historical market data for 5000+ stocks and 20+ crypto.

**News API**: Access live and historical news data for stocks and crypto.

**Trading API**: Trade stock and crypto with lightning fast execution speeds.

**Broker API & Connect**: Build investment apps - from robo-advisors to brokerages.
Expand Down Expand Up @@ -220,3 +224,28 @@ bars = client.get_crypto_bars(request_params)
bars.df

```

### News API Example <a name="news-api-example"></a>
**Querying News Data**

You can query news data via the NewsClient. In this example, we query news data for “TSLA” since July 1st 2022. You can convert the response to a pandas dataframe using the ``.df`` property.

```python
from alpaca.data.news import NewsClient
from alpaca.data.requests import NewsRequest
from datetime import datetime

# no keys required for news data
client = NewsClient()

request_params = NewsRequest(
symbols="TSLA",
start=datetime.strptime("2022-07-01", '%Y-%m-%d')
)

news = client.get_news(request_params)

# convert to dataframe
news.df

```
59 changes: 51 additions & 8 deletions alpaca/common/websocket.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import asyncio
from datetime import datetime
import logging
import queue
from collections import defaultdict
from typing import Callable, Dict, Optional, Union, Tuple

from pandas import Timestamp

import msgpack
import websockets
from pydantic import BaseModel
from alpaca import __version__

from alpaca.common.types import RawData
from alpaca.data.models import Bar, Quote, Trade
from alpaca.data.models import Bar, Quote, Trade, News

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -51,6 +54,7 @@ def __init__(
"bars": {},
"updatedBars": {},
"dailyBars": {},
"news": {},
}
self._name = "data"
self._should_run = True
Expand Down Expand Up @@ -166,17 +170,23 @@ def _cast(self, msg_type: str, msg: Dict) -> Union[BaseModel, RawData]:
if "t" in msg:
msg["t"] = msg["t"].to_datetime()

if "S" not in msg:
symbol = msg.get("S", msg.get("symbols", None))
if symbol is None:
return msg

if msg_type == "t":
result = Trade(msg["S"], msg)
result = Trade(symbol, msg)

elif msg_type == "q":
result = Quote(msg["S"], msg)
result = Quote(symbol, msg)

elif msg_type in ("b", "u", "d"):
result = Bar(msg["S"], msg)
result = Bar(symbol, msg)

elif msg_type == "n":
msg["created_at"] = msg["created_at"].to_datetime()
msg["updated_at"] = msg["updated_at"].to_datetime()
result = News(msg)

return result

Expand All @@ -187,7 +197,8 @@ async def _dispatch(self, msg: Dict) -> None:
msg (Dict): The message from the websocket connection
"""
msg_type = msg.get("T")
symbol = msg.get("S")
symbol = msg.get("S", msg.get("symbols", "*"))

if msg_type == "t":
handler = self._handlers["trades"].get(
symbol, self._handlers["trades"].get("*", None)
Expand Down Expand Up @@ -218,6 +229,13 @@ async def _dispatch(self, msg: Dict) -> None:
)
if handler:
await handler(self._cast(msg_type, msg))
elif msg_type == "n":
symbol = ",".join(symbol)
handler = self._handlers["news"].get(
symbol, self._handlers["news"].get("*", None)
)
if handler:
await handler(self._cast(msg_type, msg))
elif msg_type == "subscription":
sub = [f"{k}: {msg.get(k, [])}" for k in self._handlers]
log.info(f'subscribed to {", ".join(sub)}')
Expand Down Expand Up @@ -247,6 +265,7 @@ async def _subscribe_all(self) -> None:
if k not in ("cancelErrors", "corrections") and v:
for s in v.keys():
msg[k].append(s)

msg["action"] = "subscribe"
bs = msgpack.packb(msg)
frames = (
Expand All @@ -256,7 +275,7 @@ async def _subscribe_all(self) -> None:
await self._ws.send(frames)

async def _unsubscribe(
self, trades=(), quotes=(), bars=(), updated_bars=(), daily_bars=()
self, trades=(), quotes=(), bars=(), updated_bars=(), daily_bars=(), news=()
) -> None:
"""Unsubscribes from data for symbols specified by the data type
we want to subscribe from.
Expand All @@ -267,8 +286,9 @@ async def _unsubscribe(
bars (tuple, optional): All symbols to unsubscribe minute bar data for. Defaults to ().
updated_bars (tuple, optional): All symbols to unsubscribe updated bar data for. Defaults to ().
daily_bars (tuple, optional): All symbols to unsubscribe daily bar data for. Defaults to ().
news (tuple, optional): All symbols to unsubscribe news data for. Defaults to ().
"""
if trades or quotes or bars or updated_bars or daily_bars:
if trades or quotes or bars or updated_bars or daily_bars or news:
await self._ws.send(
msgpack.packb(
{
Expand All @@ -278,6 +298,7 @@ async def _unsubscribe(
"bars": bars,
"updatedBars": updated_bars,
"dailyBars": daily_bars,
"news": news,
}
)
)
Expand Down Expand Up @@ -370,6 +391,15 @@ def subscribe_daily_bars(self, handler: Callable, *symbols) -> None:
"""
self._subscribe(handler, symbols, self._handlers["dailyBars"])

def subscribe_news(self, handler: Callable, *symbols) -> None:
"""Subscribe to news data for symbol inputs

Args:
handler (Callable): The coroutine callback function to handle live news data
*symbols: Variable string arguments for ticker identifiers to be subscribed to.
"""
self._subscribe(handler, symbols, self._handlers["news"])

def unsubscribe_trades(self, *symbols) -> None:
"""Unsubscribe from trade data for symbol inputs

Expand Down Expand Up @@ -435,6 +465,19 @@ def unsubscribe_daily_bars(self, *symbols) -> None:
for symbol in symbols:
del self._handlers["dailyBars"][symbol]

def unsubscribe_news(self, *symbols) -> None:
"""Unsubscribe from news data for symbol inputs

Args:
*symbols: Variable string arguments for ticker identifiers to be unsubscribed from.
"""
if self._running:
asyncio.run_coroutine_threadsafe(
self._unsubscribe(news=symbols), self._loop
).result()
for symbol in symbols:
del self._handlers["news"][symbol]

def run(self) -> None:
"""Starts up the websocket connection's event loop"""
try:
Expand Down
2 changes: 2 additions & 0 deletions alpaca/data/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ class DataFeed(str, Enum):
IEX (str): Investor's exchange data feed
SIP (str): Securities Information Processor feed
OTC (str): Over the counter feed
NEWS (str): News feed
"""

IEX = "iex"
SIP = "sip"
OTC = "otc"
NEWS = "news"


class Adjustment(str, Enum):
Expand Down
92 changes: 84 additions & 8 deletions alpaca/data/historical/news.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from typing import Optional, Union
from collections import defaultdict
from enum import Enum
from typing import Any, Optional, Union, List
from alpaca.common.constants import DATA_V2_MAX_LIMIT

from alpaca.common.rest import RESTClient

from alpaca.common.enums import BaseURL

from alpaca.data.requests import NewsRequest

from alpaca.data.models.news import NewsSet
from alpaca.data.models import NewsSet

from alpaca.common.types import RawData

Expand Down Expand Up @@ -54,12 +57,85 @@ def get_news(self, request_params: NewsRequest) -> Union[RawData, NewsSet]:
"""Returns news data

Args:
request_params (NewsRequest): The request params to filter the news data"""
response = self.get(
path="/news",
data=request_params.to_request_fields(),
request_params (NewsRequest): The request params to filter the news data
feed (News)
Returns:
Union[RawData, NewsSet]: The news data
"""

params = request_params.to_request_fields()
# paginated get request for news data api
raw_news = self._data_get(
symbol_or_symbols=request_params.symbols,
**params,
)

if self._use_raw_data:
return response
return raw_news

return NewsSet(raw_news)

# TODO: Remove duplication
def _data_get(
self,
symbol_or_symbols: Union[str, List[str]],
endpoint_asset_class: str = "news",
api_version: str = "v1beta1",
limit: Optional[int] = None,
page_limit: int = DATA_V2_MAX_LIMIT,
**kwargs,
) -> RawData:
"""Performs Data API GET requests accounting for pagination. Data in responses are limited to the page_limit,
which defaults to 10,000 items. If any more data is requested, the data will be paginated.

Args:
symbol_or_symbols (Union[str, List[str]]): The symbol or list of symbols that we want to query for
api_version (str): Data API version. Defaults to "v1beta1".
endpoint_asset_class (str): The data API security type path. Defaults to 'news'.
limit (Optional[int]): The maximum number of items to query. Defaults to None.
page_limit (Optional[int]): The maximum number of items returned per page - different from limit. Defaults to DATA_V2_MAX_LIMIT.

Returns:
RawData: Raw News Market data from API
"""
# params contains the payload data
params = kwargs

# stocks, crypto, etc
path = f"/{endpoint_asset_class}"

params["symbols"] = symbol_or_symbols

data_by_symbol = defaultdict(list)

total_items = 0
page_token = None

while True:
actual_limit = None

# adjusts the limit parameter value if it is over the page_limit
if limit:
# actual_limit is the adjusted total number of items to query per request
actual_limit = min(int(limit) - total_items, page_limit)
if actual_limit < 1:
break

params["limit"] = actual_limit
params["page_token"] = page_token

response = self.get(path=path, data=params, api_version=api_version)

[data_by_symbol["news"].extend(response["news"])]

# if we've sent a request with a limit, increment count
if actual_limit:
total_items += actual_limit

page_token = response.get("next_page_token", None)

if page_token is None:
break

return NewsSet(**response)
# users receive Type dict
return dict(data_by_symbol)
49 changes: 49 additions & 0 deletions alpaca/data/live/news.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import Optional, Dict

from alpaca.common.enums import BaseURL
from alpaca.common.websocket import BaseStream
from alpaca.data.enums import DataFeed


class NewsDataStream(BaseStream):
"""
A WebSocket client for streaming live news feed from alpaca data api.

See BaseStream for more information on implementation and the methods available.
"""

def __init__(
self,
api_key: str,
secret_key: str,
raw_data: bool = False,
feed: DataFeed = DataFeed.NEWS,
websocket_params: Optional[Dict] = None,
url_override: Optional[str] = None,
) -> None:
"""
Instantiates a WebSocket client for accessing live news data.

Args:
api_key (str): Alpaca API key.
secret_key (str): Alpaca API secret key.
raw_data (bool, optional): Whether to return wrapped data or raw API data. Defaults to False.
feed (DataFeed, optional): Which news data feed to use. Defaults to News
websocket_params (Optional[Dict], optional): Any parameters for configuring websocket connection. Defaults to None.
url_override (Optional[str]): If specified allows you to override the base url the client
points to for proxy/testing. Defaults to None.

Raises:
ValueError: Only News market data feeds are supported
"""
super().__init__(
endpoint=(
url_override
if url_override is not None
else BaseURL.MARKET_DATA_STREAM.value + "/v1beta1/" + feed.value
),
api_key=api_key,
secret_key=secret_key,
raw_data=raw_data,
websocket_params=websocket_params,
)
Loading