-
Notifications
You must be signed in to change notification settings - Fork 22
Add Robinhood cryptocurrency data source #23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
286de58
201c64a
2b483d2
9718baa
bdf2072
6d6c2f9
9347ef2
8949a85
51177d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| # RobinhoodDataSource | ||
|
|
||
| > Requires the [`pynacl`](https://github.com/pyca/pynacl) library for cryptographic signing. You can install it manually: `pip install pynacl` | ||
| > or use `pip install pyspark-data-sources[robinhood]`. | ||
|
|
||
| ::: pyspark_datasources.robinhood.RobinhoodDataSource |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,248 @@ | ||||||||||||||||||||
| from dataclasses import dataclass | ||||||||||||||||||||
| from typing import Dict | ||||||||||||||||||||
| import requests | ||||||||||||||||||||
| import json | ||||||||||||||||||||
| import base64 | ||||||||||||||||||||
| import datetime | ||||||||||||||||||||
|
|
||||||||||||||||||||
| from pyspark.sql import Row | ||||||||||||||||||||
| from pyspark.sql.types import StructType | ||||||||||||||||||||
| from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| class RobinhoodDataSource(DataSource): | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| A data source for reading cryptocurrency data from Robinhood Crypto API. | ||||||||||||||||||||
| This data source allows you to fetch real-time cryptocurrency market data, | ||||||||||||||||||||
| trading pairs, and price information using Robinhood's official Crypto API. | ||||||||||||||||||||
| It implements proper API key authentication and signature-based security. | ||||||||||||||||||||
| Name: `robinhood` | ||||||||||||||||||||
| Schema: `symbol string, price double, bid_price double, ask_price double, updated_at string` | ||||||||||||||||||||
| Examples | ||||||||||||||||||||
| -------- | ||||||||||||||||||||
| Register the data source: | ||||||||||||||||||||
| >>> from pyspark_datasources import RobinhoodDataSource | ||||||||||||||||||||
| >>> spark.dataSource.register(RobinhoodDataSource) | ||||||||||||||||||||
| Load cryptocurrency market data with API authentication: | ||||||||||||||||||||
| >>> df = spark.read.format("robinhood") \\ | ||||||||||||||||||||
| ... .option("api_key", "your-api-key") \\ | ||||||||||||||||||||
| ... .option("private_key", "your-base64-private-key") \\ | ||||||||||||||||||||
| ... .load("BTC-USD,ETH-USD,DOGE-USD") | ||||||||||||||||||||
| >>> df.show() | ||||||||||||||||||||
| +--------+--------+---------+---------+--------------------+ | ||||||||||||||||||||
| | symbol| price|bid_price|ask_price| updated_at| | ||||||||||||||||||||
| +--------+--------+---------+---------+--------------------+ | ||||||||||||||||||||
| |BTC-USD |45000.50|45000.25 |45000.75 |2024-01-15T16:00:...| | ||||||||||||||||||||
| |ETH-USD | 2650.75| 2650.50 | 2651.00 |2024-01-15T16:00:...| | ||||||||||||||||||||
| |DOGE-USD| 0.085| 0.084| 0.086|2024-01-15T16:00:...| | ||||||||||||||||||||
| +--------+--------+---------+---------+--------------------+ | ||||||||||||||||||||
| Notes | ||||||||||||||||||||
| ----- | ||||||||||||||||||||
| - Requires valid Robinhood Crypto API credentials (API key and base64-encoded private key) | ||||||||||||||||||||
| - Supports all major cryptocurrencies available on Robinhood | ||||||||||||||||||||
| - Implements proper API authentication with NaCl (Sodium) signing | ||||||||||||||||||||
| - Rate limiting is handled automatically | ||||||||||||||||||||
| - Based on official Robinhood Crypto Trading API documentation | ||||||||||||||||||||
| - Requires 'pynacl' library for cryptographic signing: pip install pynacl | ||||||||||||||||||||
| - Reference: https://docs.robinhood.com/crypto/trading/ | ||||||||||||||||||||
| """ | ||||||||||||||||||||
coderabbitai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| @classmethod | ||||||||||||||||||||
| def name(cls) -> str: | ||||||||||||||||||||
| return "robinhood" | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def schema(self) -> str: | ||||||||||||||||||||
| return ( | ||||||||||||||||||||
| "symbol string, price double, bid_price double, ask_price double, " | ||||||||||||||||||||
| "updated_at string" | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| def reader(self, schema: StructType): | ||||||||||||||||||||
| return RobinhoodDataReader(schema, self.options) | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| @dataclass | ||||||||||||||||||||
| class CryptoPair(InputPartition): | ||||||||||||||||||||
| """Represents a single crypto trading pair partition for parallel processing.""" | ||||||||||||||||||||
| symbol: str | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| class RobinhoodDataReader(DataSourceReader): | ||||||||||||||||||||
| """Reader implementation for Robinhood Crypto API data source.""" | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def __init__(self, schema: StructType, options: Dict): | ||||||||||||||||||||
| self.schema = schema | ||||||||||||||||||||
| self.options = options | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Required API authentication | ||||||||||||||||||||
| self.api_key = options.get("api_key") | ||||||||||||||||||||
| self.private_key_base64 = options.get("private_key") | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if not self.api_key or not self.private_key_base64: | ||||||||||||||||||||
| raise ValueError( | ||||||||||||||||||||
| "Robinhood Crypto API requires both 'api_key' and 'private_key' options. " | ||||||||||||||||||||
| "The private_key should be base64-encoded. " | ||||||||||||||||||||
| "Get your API credentials from https://docs.robinhood.com/crypto/trading/" | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Initialize NaCl signing key | ||||||||||||||||||||
| try: | ||||||||||||||||||||
| from nacl.signing import SigningKey | ||||||||||||||||||||
| private_key_seed = base64.b64decode(self.private_key_base64) | ||||||||||||||||||||
| self.signing_key = SigningKey(private_key_seed) | ||||||||||||||||||||
| except ImportError: | ||||||||||||||||||||
| raise ImportError( | ||||||||||||||||||||
| "PyNaCl library is required for Robinhood Crypto API authentication. " | ||||||||||||||||||||
| "Install it with: pip install pynacl" | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||
| raise ValueError(f"Invalid private key format: {str(e)}") | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| # Crypto API base URL | ||||||||||||||||||||
| self.base_url = "https://trading.robinhood.com" | ||||||||||||||||||||
|
||||||||||||||||||||
| # Crypto API base URL | |
| self.base_url = "https://trading.robinhood.com" | |
| # Crypto API base URL | |
| self.base_url = "https://trading.robinhood.com" | |
| # Initialize session for connection pooling (lazily initialized) | |
| self._session = None |
🤖 Prompt for AI Agents
In pyspark_datasources/robinhood.py around lines 53 to 57, the class sets
self.base_url but does not initialize an HTTP session or handle cleanup; add a
lazily-initialized requests.Session stored as self._session (create it only when
first needed for making requests), update all request calls to use this session
for connection pooling, and provide a public close() method plus context-manager
support (__enter__/__exit__) or a __del__ fallback to call close() so the
session is properly closed when the client is disposed.
Yicong-Huang marked this conversation as resolved.
Show resolved
Hide resolved
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add type hint here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added. also added type hints for other methods.
Yicong-Huang marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it support passing timestamp to query for historical data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately, it is not supported at the moment.
Uh oh!
There was an error while loading. Please reload this page.