How to Fetch Bitcoin Prices in Python: API Tutorial
Learn how to fetch Bitcoin prices in Python using free cryptocurrency APIs. This complete tutorial covers everything from simple price lookups to building price alert systems, analyzing historical data with pandas, creating visualizations with matplotlib, and implementing real-time monitoring with WebSockets. All code examples are production-ready and include error handling.
Python is the most popular language for cryptocurrency development, offering excellent libraries for API integration, data analysis, and visualization. Whether you're building a trading bot, portfolio tracker, or research tool, fetching Bitcoin prices programmatically is a fundamental skill.
This tutorial uses free APIs (CoinGecko, Binance, CoinMarketCap) and popular Python libraries (requests, pandas, matplotlib). By the end, you'll have a complete toolkit for working with Bitcoin price data in Python.
1. Setting Up Your Environment
Installing Required Libraries
# Create virtual environment (recommended)
python -m venv btc_env
source btc_env/bin/activate # On Windows: btc_env\Scripts\activate
# Install required packages
pip install requests pandas matplotlib python-dotenv websocket-client
# Optional: for advanced features
pip install redis schedule plotly
Project Structure
bitcoin-price-tracker/
├── btc_api.py # API integration module
├── alerts.py # Price alert system
├── analysis.py # Data analysis with pandas
├── visualize.py # Charts and graphs
├── realtime.py # WebSocket implementation
├── config.py # Configuration settings
├── .env # API keys (gitignore this)
└── requirements.txt # Dependencies
Best Practice: Use a virtual environment to isolate project dependencies. Store API keys in .env files (never commit to git). Use type hints for better code documentation.
2. Fetching Current Bitcoin Price
Simple Price Lookup
# btc_api.py - Basic Bitcoin price fetcher
import requests
from typing import Dict, Optional
from datetime import datetime
class BitcoinPriceAPI:
"""
Fetch Bitcoin prices from multiple APIs with fallback.
"""
def __init__(self):
self.coingecko_url = "https://api.coingecko.com/api/v3"
self.binance_url = "https://api.binance.com/api/v3"
def get_price_coingecko(self, currency: str = 'usd') -> Optional[Dict]:
"""
Get Bitcoin price from CoinGecko (no API key needed).
"""
try:
url = f"{self.coingecko_url}/simple/price"
params = {
'ids': 'bitcoin',
'vs_currencies': currency,
'include_24hr_change': 'true',
'include_market_cap': 'true',
'include_24hr_vol': 'true'
}
response = requests.get(url, params=params, timeout=5)
response.raise_for_status()
data = response.json()['bitcoin']
return {
'price': data[currency],
'change_24h': data.get(f'{currency}_24h_change'),
'market_cap': data.get(f'{currency}_market_cap'),
'volume_24h': data.get(f'{currency}_24h_vol'),
'currency': currency.upper(),
'source': 'CoinGecko',
'timestamp': datetime.utcnow()
}
except requests.RequestException as e:
print(f"CoinGecko API error: {e}")
return None
def get_price_binance(self) -> Optional[Dict]:
"""
Get Bitcoin price from Binance (USDT pair).
"""
try:
url = f"{self.binance_url}/ticker/24hr"
params = {'symbol': 'BTCUSDT'}
response = requests.get(url, params=params, timeout=5)
response.raise_for_status()
data = response.json()
return {
'price': float(data['lastPrice']),
'change_24h': float(data['priceChangePercent']),
'high_24h': float(data['highPrice']),
'low_24h': float(data['lowPrice']),
'volume_24h': float(data['volume']),
'currency': 'USD',
'source': 'Binance',
'timestamp': datetime.utcnow()
}
except requests.RequestException as e:
print(f"Binance API error: {e}")
return None
def get_price(self, currency: str = 'usd') -> Optional[Dict]:
"""
Get Bitcoin price with automatic fallback.
Tries CoinGecko first, falls back to Binance.
"""
# Try CoinGecko
price_data = self.get_price_coingecko(currency)
if price_data:
return price_data
# Fallback to Binance (USD only)
if currency.lower() == 'usd':
return self.get_price_binance()
return None
def format_price(self, price_data: Dict) -> str:
"""
Format price data for display.
"""
if not price_data:
return "Price unavailable"
price = price_data['price']
change = price_data.get('change_24h', 0)
currency = price_data['currency']
source = price_data['source']
change_symbol = "+" if change > 0 else ""
return (f"BTC/{currency}: ${price:,.2f} "
f"({change_symbol}{change:.2f}% 24h) "
f"[{source}]")
# Usage example
if __name__ == "__main__":
api = BitcoinPriceAPI()
# Get current price
btc_price = api.get_price('usd')
print(api.format_price(btc_price))
# Get price in multiple currencies
for currency in ['usd', 'eur', 'gbp']:
price_data = api.get_price(currency)
if price_data:
print(api.format_price(price_data))
# Output:
# BTC/USD: $52,431.00 (+2.34% 24h) [CoinGecko]
# BTC/EUR: $48,123.00 (+2.31% 24h) [CoinGecko]
# BTC/GBP: $41,567.00 (+2.29% 24h) [CoinGecko]
Error Handling: Always wrap API calls in try-except blocks. Network requests can fail for many reasons: timeouts, rate limits, API downtime. Implement fallback mechanisms for production use.
3. Getting Historical BTC Data
Fetch Historical Prices
import requests
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict
class BitcoinHistoricalData:
"""
Fetch and process historical Bitcoin price data.
"""
def __init__(self):
self.coingecko_url = "https://api.coingecko.com/api/v3"
def get_historical_prices(self, days: int = 30, currency: str = 'usd') -> pd.DataFrame:
"""
Get historical Bitcoin prices for the last N days.
Returns pandas DataFrame with OHLC data.
"""
try:
url = f"{self.coingecko_url}/coins/bitcoin/market_chart"
params = {
'vs_currency': currency,
'days': days,
'interval': 'daily'
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
# Convert to DataFrame
df = pd.DataFrame({
'timestamp': [x[0] for x in data['prices']],
'price': [x[1] for x in data['prices']],
'market_cap': [x[1] for x in data['market_caps']],
'volume': [x[1] for x in data['total_volumes']]
})
# Convert timestamp to datetime
df['date'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('date', inplace=True)
df.drop('timestamp', axis=1, inplace=True)
return df
except requests.RequestException as e:
print(f"Error fetching historical data: {e}")
return pd.DataFrame()
def get_ohlc_data(self, days: int = 90) -> pd.DataFrame:
"""
Get OHLC (Open, High, Low, Close) candlestick data from Binance.
"""
try:
url = "https://api.binance.com/api/v3/klines"
params = {
'symbol': 'BTCUSDT',
'interval': '1d', # Daily candles
'limit': days
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
klines = response.json()
# Parse kline data
df = pd.DataFrame(klines, columns=[
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
'taker_buy_quote', 'ignore'
])
# Convert to proper types
df['date'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('date', inplace=True)
# Keep only OHLCV columns and convert to float
df = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
return df
except requests.RequestException as e:
print(f"Error fetching OHLC data: {e}")
return pd.DataFrame()
def calculate_returns(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Calculate daily and cumulative returns.
"""
df = df.copy()
# Daily returns
df['daily_return'] = df['close'].pct_change()
# Cumulative returns
df['cumulative_return'] = (1 + df['daily_return']).cumprod() - 1
return df
def calculate_moving_averages(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Add moving averages to price data.
"""
df = df.copy()
# Common moving averages
df['MA_7'] = df['close'].rolling(window=7).mean()
df['MA_25'] = df['close'].rolling(window=25).mean()
df['MA_99'] = df['close'].rolling(window=99).mean()
return df
# Usage example
if __name__ == "__main__":
historical = BitcoinHistoricalData()
# Get last 30 days of prices
df = historical.get_historical_prices(days=30)
print("Last 30 days:")
print(df.tail())
print(f"\nAverage price: ${df['price'].mean():,.2f}")
print(f"Highest price: ${df['price'].max():,.2f}")
print(f"Lowest price: ${df['price'].min():,.2f}")
# Get OHLC data with moving averages
ohlc = historical.get_ohlc_data(days=90)
ohlc = historical.calculate_moving_averages(ohlc)
ohlc = historical.calculate_returns(ohlc)
print("\nLast 5 days OHLC:")
print(ohlc[['open', 'high', 'low', 'close', 'MA_7']].tail())
# Calculate volatility
volatility = ohlc['daily_return'].std() * (365 ** 0.5) # Annualized
print(f"\nAnnualized volatility: {volatility:.2%}")
Pandas Power: pandas makes it easy to analyze time-series data. Use rolling windows for moving averages, pct_change() for returns, and describe() for statistical summaries.
4. Building Price Alert System
Email Price Alerts
# alerts.py - Bitcoin price alert system
import time
import smtplib
from email.mime.text import MIMEText
from typing import List, Dict
from datetime import datetime
from btc_api import BitcoinPriceAPI
class PriceAlert:
"""
Monitor Bitcoin price and send email alerts.
"""
def __init__(self, email_config: Dict):
self.api = BitcoinPriceAPI()
self.email_config = email_config
self.alerts = []
self.last_price = None
def add_alert(self, alert_type: str, threshold: float, currency: str = 'usd'):
"""
Add a price alert.
alert_type: 'above', 'below', 'change_percent'
"""
self.alerts.append({
'type': alert_type,
'threshold': threshold,
'currency': currency,
'triggered': False
})
def check_alerts(self) -> List[Dict]:
"""
Check all alerts and return triggered ones.
"""
current_data = self.api.get_price()
if not current_data:
return []
current_price = current_data['price']
triggered_alerts = []
for alert in self.alerts:
if alert['triggered']:
continue
triggered = False
if alert['type'] == 'above' and current_price > alert['threshold']:
triggered = True
message = f"BTC price ${current_price:,.2f} is above ${alert['threshold']:,.2f}"
elif alert['type'] == 'below' and current_price < alert['threshold']:
triggered = True
message = f"BTC price ${current_price:,.2f} is below ${alert['threshold']:,.2f}"
elif alert['type'] == 'change_percent' and self.last_price:
change_percent = ((current_price - self.last_price) / self.last_price) * 100
if abs(change_percent) >= alert['threshold']:
triggered = True
message = f"BTC price changed {change_percent:+.2f}% (${current_price:,.2f})"
if triggered:
alert['triggered'] = True
alert['message'] = message
alert['price'] = current_price
alert['timestamp'] = datetime.utcnow()
triggered_alerts.append(alert)
self.last_price = current_price
return triggered_alerts
def send_email_alert(self, alert: Dict):
"""
Send email notification for triggered alert.
"""
try:
msg = MIMEText(f"""
Bitcoin Price Alert Triggered!
{alert['message']}
Alert Type: {alert['type']}
Threshold: {alert['threshold']}
Current Price: ${alert['price']:,.2f}
Time: {alert['timestamp']}
This is an automated alert from your Bitcoin price monitor.
""")
msg['Subject'] = f"BTC Alert: {alert['message']}"
msg['From'] = self.email_config['from_email']
msg['To'] = self.email_config['to_email']
with smtplib.SMTP(self.email_config['smtp_server'],
self.email_config['smtp_port']) as server:
server.starttls()
server.login(self.email_config['username'],
self.email_config['password'])
server.send_message(msg)
print(f"Alert sent: {alert['message']}")
except Exception as e:
print(f"Failed to send email: {e}")
def monitor(self, check_interval: int = 60):
"""
Start monitoring Bitcoin price.
check_interval: seconds between checks
"""
print(f"Starting Bitcoin price monitor (checking every {check_interval}s)...")
while True:
try:
triggered = self.check_alerts()
for alert in triggered:
print(f"ALERT TRIGGERED: {alert['message']}")
self.send_email_alert(alert)
# Display current price
current = self.api.get_price()
if current:
print(f"[{datetime.now().strftime('%H:%M:%S')}] {self.api.format_price(current)}")
time.sleep(check_interval)
except KeyboardInterrupt:
print("\nMonitoring stopped by user")
break
except Exception as e:
print(f"Error in monitoring loop: {e}")
time.sleep(check_interval)
# Usage example
if __name__ == "__main__":
# Configure email settings
email_config = {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'from_email': '[email protected]',
'to_email': '[email protected]',
'username': '[email protected]',
'password': 'your_app_password' # Use app-specific password
}
monitor = PriceAlert(email_config)
# Add alerts
monitor.add_alert('above', 55000) # Alert if BTC > $55,000
monitor.add_alert('below', 50000) # Alert if BTC < $50,000
monitor.add_alert('change_percent', 5) # Alert if price changes 5%
# Start monitoring (checks every 60 seconds)
monitor.monitor(check_interval=60)
Gmail Setup: For Gmail, you need to enable "Less secure app access" or use App Passwords. For production, use services like SendGrid or AWS SES for better deliverability.
5. Data Analysis with Pandas
Statistical Analysis
# analysis.py - Advanced Bitcoin price analysis
import pandas as pd
import numpy as np
from btc_api import BitcoinHistoricalData
class BitcoinAnalysis:
"""
Perform statistical analysis on Bitcoin price data.
"""
def __init__(self):
self.data = BitcoinHistoricalData()
def get_price_statistics(self, days: int = 365) -> Dict:
"""
Calculate comprehensive price statistics.
"""
df = self.data.get_ohlc_data(days=days)
if df.empty:
return {}
stats = {
'current_price': df['close'].iloc[-1],
'mean_price': df['close'].mean(),
'median_price': df['close'].median(),
'std_dev': df['close'].std(),
'min_price': df['close'].min(),
'max_price': df['close'].max(),
'price_range': df['close'].max() - df['close'].min(),
# Returns
'total_return': (df['close'].iloc[-1] / df['close'].iloc[0] - 1) * 100,
'avg_daily_return': df['close'].pct_change().mean() * 100,
# Volatility
'daily_volatility': df['close'].pct_change().std() * 100,
'annual_volatility': df['close'].pct_change().std() * np.sqrt(365) * 100,
# Volume
'avg_daily_volume': df['volume'].mean(),
'total_volume': df['volume'].sum(),
}
return stats
def detect_support_resistance(self, df: pd.DataFrame, window: int = 20) -> Dict:
"""
Detect support and resistance levels.
"""
# Calculate local minima and maxima
df['local_min'] = df['low'] == df['low'].rolling(window=window, center=True).min()
df['local_max'] = df['high'] == df['high'].rolling(window=window, center=True).max()
support_levels = df[df['local_min']]['low'].values
resistance_levels = df[df['local_max']]['high'].values
return {
'support_levels': np.round(support_levels, 2).tolist(),
'resistance_levels': np.round(resistance_levels, 2).tolist(),
'current_support': support_levels[-1] if len(support_levels) > 0 else None,
'current_resistance': resistance_levels[-1] if len(resistance_levels) > 0 else None
}
def calculate_rsi(self, df: pd.DataFrame, period: int = 14) -> pd.Series:
"""
Calculate Relative Strength Index (RSI).
"""
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def generate_trading_signals(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Generate simple moving average crossover signals.
"""
df = df.copy()
# Calculate MAs
df['MA_fast'] = df['close'].rolling(window=10).mean()
df['MA_slow'] = df['close'].rolling(window=30).mean()
# Generate signals
df['signal'] = 0
df.loc[df['MA_fast'] > df['MA_slow'], 'signal'] = 1 # Buy signal
df.loc[df['MA_fast'] < df['MA_slow'], 'signal'] = -1 # Sell signal
# Detect crossovers
df['position'] = df['signal'].diff()
return df
# Usage example
if __name__ == "__main__":
analysis = BitcoinAnalysis()
# Get comprehensive statistics
stats = analysis.get_price_statistics(days=365)
print("Bitcoin Price Statistics (Last 365 Days)")
print("=" * 50)
print(f"Current Price: ${stats['current_price']:,.2f}")
print(f"Mean Price: ${stats['mean_price']:,.2f}")
print(f"Price Range: ${stats['min_price']:,.2f} - ${stats['max_price']:,.2f}")
print(f"\nTotal Return: {stats['total_return']:.2f}%")
print(f"Average Daily Return: {stats['avg_daily_return']:.4f}%")
print(f"\nDaily Volatility: {stats['daily_volatility']:.2f}%")
print(f"Annual Volatility: {stats['annual_volatility']:.2f}%")
# Get OHLC data for advanced analysis
df = analysis.data.get_ohlc_data(days=90)
# Calculate RSI
df['RSI'] = analysis.calculate_rsi(df)
print(f"\nCurrent RSI: {df['RSI'].iloc[-1]:.2f}")
if df['RSI'].iloc[-1] > 70:
print(" → Overbought (potential sell signal)")
elif df['RSI'].iloc[-1] < 30:
print(" → Oversold (potential buy signal)")
# Detect support/resistance
levels = analysis.detect_support_resistance(df)
print(f"\nSupport Levels: {levels['support_levels'][-5:]}")
print(f"Resistance Levels: {levels['resistance_levels'][-5:]}")
6. Price Visualization with Matplotlib
Creating Price Charts
# visualize.py - Bitcoin price visualization
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from btc_api import BitcoinHistoricalData
class BitcoinVisualizer:
"""
Create visualizations of Bitcoin price data.
"""
def __init__(self):
self.data = BitcoinHistoricalData()
plt.style.use('seaborn-v0_8-darkgrid')
def plot_price_history(self, days: int = 90, save_path: str = None):
"""
Plot Bitcoin price history with volume.
"""
df = self.data.get_ohlc_data(days=days)
if df.empty:
print("No data available")
return
# Calculate moving averages
df['MA_7'] = df['close'].rolling(window=7).mean()
df['MA_25'] = df['close'].rolling(window=25).mean()
# Create subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10),
gridspec_kw={'height_ratios': [3, 1]})
# Plot price and moving averages
ax1.plot(df.index, df['close'], label='BTC Price', linewidth=2, color='#f7931a')
ax1.plot(df.index, df['MA_7'], label='7-day MA', linewidth=1.5,
linestyle='--', alpha=0.7)
ax1.plot(df.index, df['MA_25'], label='25-day MA', linewidth=1.5,
linestyle='--', alpha=0.7)
ax1.set_title(f'Bitcoin Price - Last {days} Days', fontsize=16, fontweight='bold')
ax1.set_ylabel('Price (USD)', fontsize=12)
ax1.legend(loc='upper left')
ax1.grid(True, alpha=0.3)
# Format y-axis as currency
ax1.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x:,.0f}'))
# Plot volume
colors = ['g' if df['close'].iloc[i] >= df['open'].iloc[i] else 'r'
for i in range(len(df))]
ax2.bar(df.index, df['volume'], color=colors, alpha=0.5)
ax2.set_ylabel('Volume (BTC)', fontsize=12)
ax2.set_xlabel('Date', fontsize=12)
ax2.grid(True, alpha=0.3)
# Format x-axis
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%m/%d'))
plt.xticks(rotation=45)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"Chart saved to {save_path}")
plt.show()
def plot_candlestick(self, days: int = 30, save_path: str = None):
"""
Create candlestick chart.
"""
from matplotlib.patches import Rectangle
df = self.data.get_ohlc_data(days=days)
if df.empty:
return
fig, ax = plt.subplots(figsize=(14, 8))
for idx, (date, row) in enumerate(df.iterrows()):
# Determine color
color = 'g' if row['close'] >= row['open'] else 'r'
# Draw candlestick body
body_height = abs(row['close'] - row['open'])
body_bottom = min(row['open'], row['close'])
body = Rectangle((idx - 0.3, body_bottom), 0.6, body_height,
facecolor=color, edgecolor='black', alpha=0.8)
ax.add_patch(body)
# Draw wicks
ax.plot([idx, idx], [row['low'], row['high']], color='black', linewidth=1)
ax.set_xlim(-1, len(df))
ax.set_ylim(df['low'].min() * 0.95, df['high'].max() * 1.05)
ax.set_title(f'Bitcoin Candlestick Chart - Last {days} Days',
fontsize=16, fontweight='bold')
ax.set_ylabel('Price (USD)', fontsize=12)
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x:,.0f}'))
# Set x-axis labels
step = max(len(df) // 10, 1)
ax.set_xticks(range(0, len(df), step))
ax.set_xticklabels([df.index[i].strftime('%m/%d') for i in range(0, len(df), step)],
rotation=45)
plt.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300)
plt.show()
# Usage
if __name__ == "__main__":
viz = BitcoinVisualizer()
# Plot 90-day price history
viz.plot_price_history(days=90, save_path='btc_price_90d.png')
# Plot 30-day candlestick chart
viz.plot_candlestick(days=30, save_path='btc_candlestick_30d.png')
7. Real-Time Monitoring with WebSockets
# realtime.py - WebSocket price monitoring
import websocket
import json
from datetime import datetime
class BitcoinRealtimeMonitor:
"""
Monitor Bitcoin price in real-time using WebSocket.
"""
def __init__(self):
self.ws = None
self.prices = []
def on_message(self, ws, message):
"""Handle incoming WebSocket messages."""
data = json.loads(message)
if 'data' in data:
ticker = data['data']
price = float(ticker['c']) # Current price
change = float(ticker['P']) # 24h change percent
timestamp = datetime.now().strftime('%H:%M:%S')
print(f"[{timestamp}] BTC: ${price:,.2f} ({change:+.2f}% 24h)")
self.prices.append({
'timestamp': timestamp,
'price': price,
'change': change
})
def on_error(self, ws, error):
"""Handle WebSocket errors."""
print(f"WebSocket error: {error}")
def on_close(self, ws, close_status_code, close_msg):
"""Handle WebSocket closure."""
print("WebSocket connection closed")
def on_open(self, ws):
"""Handle WebSocket connection open."""
print("WebSocket connection established")
print("Monitoring Bitcoin price in real-time...\n")
def start(self):
"""Start real-time monitoring."""
# Binance WebSocket stream
url = "wss://stream.binance.com:9443/stream?streams=btcusdt@ticker"
self.ws = websocket.WebSocketApp(
url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
self.ws.run_forever()
# Usage
if __name__ == "__main__":
monitor = BitcoinRealtimeMonitor()
monitor.start() # Press Ctrl+C to stop
8. Production-Ready Implementation
# Production-ready Bitcoin price tracker
import redis
import logging
from typing import Optional
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class ProductionBitcoinAPI:
"""
Production-ready Bitcoin API with caching, logging, and error handling.
"""
def __init__(self, redis_host: str = 'localhost', cache_ttl: int = 60):
self.logger = logging.getLogger(__name__)
self.cache_ttl = cache_ttl
try:
self.redis = redis.Redis(host=redis_host, port=6379, db=0,
decode_responses=True)
self.redis.ping()
self.logger.info("Redis connection established")
except Exception as e:
self.logger.warning(f"Redis unavailable: {e}")
self.redis = None
def get_price_with_cache(self, currency: str = 'usd') -> Optional[Dict]:
"""
Get Bitcoin price with Redis caching.
"""
cache_key = f"btc_price:{currency}"
# Check cache
if self.redis:
try:
cached = self.redis.get(cache_key)
if cached:
self.logger.info(f"Cache hit for {currency}")
return eval(cached)
except Exception as e:
self.logger.error(f"Redis error: {e}")
# Fetch from API
api = BitcoinPriceAPI()
price_data = api.get_price(currency)
if not price_data:
self.logger.error(f"Failed to fetch price for {currency}")
return None
# Cache result
if self.redis:
try:
self.redis.setex(cache_key, self.cache_ttl, str(price_data))
self.logger.info(f"Cached price for {currency}")
except Exception as e:
self.logger.error(f"Failed to cache: {e}")
return price_data
Conclusion
You now have a complete Python toolkit for working with Bitcoin price data. From simple price lookups to sophisticated analysis and real-time monitoring, these examples provide production-ready code you can adapt to your needs.
Key takeaways: always implement error handling and fallbacks, use caching to respect API rate limits, leverage pandas for data analysis, and choose WebSockets over polling for real-time data.
Related Articles
Need Traditional Currency Data?
While crypto APIs provide digital asset prices, UniRate API offers exchange rates for 160+ traditional currencies with historical data back to 1999. Perfect for multi-currency applications combining crypto and fiat.
Explore Currency API