Trong bài viết này, tôi sẽ chia sẻ kinh nghiệm thực chiến khi xây dựng hệ thống trực quan hóa dữ liệu K-line cho thị trường tiền mã hóa sử dụng Python và Tardis API. Đây là giải pháp tôi đã triển khai cho nhiều dự án trading và phân tích thị trường, với những bài học quý giá về kiến trúc, tối ưu hiệu suất và kiểm soát chi phí.

Tardis API là gì và tại sao nên sử dụng

Tardis (tardis.dev) là một trong những nhà cung cấp dữ liệu thị trường tiền mã hóa hàng đầu, cung cấp dữ liệu lịch sử real-time với độ trễ thấp và độ tin cậy cao. So với việc tự xây dựng hệ thống thu thập dữ liệu từ nhiều sàn giao dịch, Tardis giúp tiết kiệm hàng trăm giờ phát triển và vận hành.

Tính năng nổi bật

Cài đặt môi trường và dependencies

# Tạo virtual environment
python -m venv kline_venv
source kline_venv/bin/activate  # Linux/Mac

kline_venv\Scripts\activate # Windows

Cài đặt dependencies

pip install tardis-client pandas numpy matplotlib plotly pip install kaleido asyncio aiohttp python-dotenv

Kiểm tra phiên bản

python -c "import tardis_client; print(tardis_client.__version__)"

Kết nối API và lấy dữ liệu K-line

import os
from tardis_client import TardisClient, Channels
from datetime import datetime, timedelta
import pandas as pd

Cấu hình API credentials

TARDIS_API_KEY = os.getenv("TARDIS_API_KEY", "your_tardis_api_key") class KLineDataFetcher: """Lớp xử lý lấy dữ liệu K-line từ Tardis API""" def __init__(self, api_key: str): self.client = TardisClient(api_key=api_key) self.exchange = "binance" self.symbol = "BTC-USDT" self.timeframe = "1m" async def fetch_historical_klines( self, start_time: datetime, end_time: datetime ) -> pd.DataFrame: """ Lấy dữ liệu K-line lịch sử trong khoảng thời gian xác định Args: start_time: Thời gian bắt đầu end_time: Thời gian kết thúc Returns: DataFrame chứa dữ liệu K-line """ messages = [] async for message in self.client.market_data_stream( exchange=self.exchange, channels=[Channels.FUTURES, Channels.COIN_MAPPING], symbols=[self.symbol], from_date=start_time.isoformat(), to_date=end_time.isoformat(), ): if message.type == "kline": kline = message.data messages.append({ "timestamp": pd.to_datetime(kline["timestamp"], unit="ms"), "open": float(kline["open"]), "high": float(kline["high"]), "low": float(kline["low"]), "close": float(kline["close"]), "volume": float(kline["volume"]), "trades": kline.get("trades", 0), "quote_volume": kline.get("quote_volume", 0), }) df = pd.DataFrame(messages) if not df.empty: df.set_index("timestamp", inplace=True) df.sort_index(inplace=True) return df async def stream_realtime_klines(self, duration_seconds: int = 60): """ Stream dữ liệu K-line real-time trong khoảng thời gian xác định Args: duration_seconds: Thời gian stream (giây) """ import asyncio messages = [] start = datetime.now() async for message in self.client.market_data_stream( exchange=self.exchange, channels=[Channels.FUTURES], symbols=[self.symbol], ): if message.type == "kline": messages.append(message.data) if (datetime.now() - start).seconds >= duration_seconds: break return messages

Sử dụng

async def main(): fetcher = KLineDataFetcher(api_key=TARDIS_API_KEY) # Lấy dữ liệu 24 giờ gần nhất end_time = datetime.now() start_time = end_time - timedelta(hours=24) df = await fetcher.fetch_historical_klines(start_time, end_time) print(f"Đã lấy {len(df)} candles") print(df.tail())

Chạy

asyncio.run(main())

Trực quan hóa dữ liệu với Plotly

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px

class KLineVisualizer:
    """Lớp trực quan hóa dữ liệu K-line với nhiều loại chart"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df
    
    def create_candlestick_chart(
        self, 
        title: str = "BTC/USDT K-line Chart",
        show_volume: bool = True
    ) -> go.Figure:
        """
        Tạo biểu đồ nến Nhật với khối lượng giao dịch
        
        Args:
            title: Tiêu đề biểu đồ
            show_volume: Hiển thị khối lượng
        
        Returns:
            Plotly Figure object
        """
        fig = make_subplots(
            rows=2 if show_volume else 1,
            cols=1,
            shared_xaxes=True,
            vertical_spacing=0.03,
            row_heights=[0.7, 0.3] if show_volume else [1.0],
            subplot_titles=("Price", "Volume")
        )
        
        # Candlestick
        fig.add_trace(
            go.Candlestick(
                x=self.df.index,
                open=self.df["open"],
                high=self.df["high"],
                low=self.df["low"],
                close=self.df["close"],
                name="OHLC",
                increasing_line_color="#26a69a",
                decreasing_line_color="#ef5350",
            ),
            row=1, col=1
        )
        
        # Moving Averages
        self.df["MA7"] = self.df["close"].rolling(window=7).mean()
        self.df["MA25"] = self.df["close"].rolling(window=25).mean()
        self.df["MA99"] = self.df["close"].rolling(window=99).mean()
        
        for ma, color, name in [
            ("MA7", "#FF6B6B", "MA7"),
            ("MA25", "#4ECDC4", "MA25"),
            ("MA99", "#45B7D1", "MA99")
        ]:
            fig.add_trace(
                go.Scatter(
                    x=self.df.index,
                    y=self.df[ma],
                    mode="lines",
                    name=name,
                    line=dict(color=color, width=1.5)
                ),
                row=1, col=1
            )
        
        # Volume bars
        if show_volume:
            colors = ["#26a69a" if self.df["close"].iloc[i] >= self.df["open"].iloc[i] 
                      else "#ef5350" for i in range(len(self.df))]
            
            fig.add_trace(
                go.Bar(
                    x=self.df.index,
                    y=self.df["volume"],
                    name="Volume",
                    marker_color=colors,
                    opacity=0.7
                ),
                row=2, col=1
            )
        
        fig.update_layout(
            title=dict(text=title, font=dict(size=20)),
            xaxis_rangeslider_visible=False,
            template="plotly_dark",
            height=800,
            legend=dict(
                orientation="h",
                yanchor="bottom",
                y=1.02,
                xanchor="right",
                x=1
            ),
            margin=dict(l=50, r=50, t=80, b=50)
        )
        
        fig.update_xaxes(title_text="Time", row=2, col=1)
        fig.update_yaxes(title_text="Price (USDT)", row=1, col=1)
        
        return fig
    
    def create_volume_profile(self, bins: int = 50) -> go.Figure:
        """
        Tạo biểu đồ Volume Profile
        
        Args:
            bins: Số lượng bins cho phân bố giá
        
        Returns:
            Plotly Figure object
        """
        price_bins = pd.cut(self.df["close"], bins=bins)
        volume_by_price = self.df.groupby(price_bins)["volume"].sum()
        
        fig = go.Figure()
        
        fig.add_trace(go.Bar(
            y=volume_by_price.index.astype(str),
            x=volume_by_price.values,
            orientation="h",
            marker=dict(
                color=volume_by_price.values,
                colorscale="Viridis"
            )
        ))
        
        fig.update_layout(
            title="Volume Profile",
            xaxis_title="Total Volume",
            yaxis_title="Price Range",
            template="plotly_dark",
            height=600
        )
        
        return fig
    
    def add_indicators(self, fig: go.Figure) -> go.Figure:
        """Thêm các chỉ báo kỹ thuật vào biểu đồ"""
        
        # RSI
        delta = self.df["close"].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
        rs = gain / loss
        self.df["RSI"] = 100 - (100 / (1 + rs))
        
        # MACD
        exp1 = self.df["close"].ewm(span=12, adjust=False).mean()
        exp2 = self.df["close"].ewm(span=26, adjust=False).mean()
        self.df["MACD"] = exp1 - exp2
        self.df["Signal"] = self.df["MACD"].ewm(span=9, adjust=False).mean()
        self.df["Histogram"] = self.df["MACD"] - self.df["Signal"]
        
        return fig

Sử dụng

visualizer = KLineVisualizer(df)

fig = visualizer.create_candlestick_chart()

fig.show()

Tối ưu hiệu suất với Async/Await và Connection Pooling

Trong production, việc xử lý hàng triệu records đòi hỏi chiến lược async hiệu quả. Dưới đây là kiến trúc tôi đã tối ưu qua nhiều lần benchmark.

import asyncio
import aiohttp
from typing import List, Dict, Optional
from dataclasses import dataclass
from contextlib import asynccontextmanager
import json
from datetime import datetime

@dataclass
class KLineRecord:
    timestamp: datetime
    open: float
    high: float
    low: float
    close: float
    volume: float

class AsyncKLineCollector:
    """
    Bộ thu thập dữ liệu K-line async với connection pooling
    và batch processing
    """
    
    def __init__(
        self,
        api_key: str,
        max_concurrent: int = 10,
        rate_limit: int = 100  # requests per second
    ):
        self.api_key = api_key
        self.max_concurrent = max_concurrent
        self.rate_limit = rate_limit
        self.base_url = "https://api.tardis.dev/v1"
        self._session: Optional[aiohttp.ClientSession] = None
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._request_times: List[float] = []
        self._lock = asyncio.Lock()
        
        # Performance metrics
        self.metrics = {
            "total_requests": 0,
            "failed_requests": 0,
            "total_latency_ms": 0,
            "cache_hits": 0,
        }
    
    @asynccontextmanager
    async def session(self):
        """Quản lý aiohttp session với connection pooling"""
        connector = aiohttp.TCPConnector(
            limit=self.max_concurrent,
            limit_per_host=10,
            ttl_dns_cache=300,
            enable_cleanup_closed=True,
        )
        
        timeout = aiohttp.ClientTimeout(total=30, connect=5)
        
        async with aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            headers={"Authorization": f"Bearer {self.api_key}"}
        ) as session:
            self._session = session
            yield session
    
    async def _rate_limit(self):
        """Điều chỉnh tốc độ request để không vượt quá rate limit"""
        async with self._lock:
            now = asyncio.get_event_loop().time()
            # Loại bỏ các request cũ hơn 1 giây
            self._request_times = [
                t for t in self._request_times 
                if now - t < 1.0
            ]
            
            if len(self._request_times) >= self.rate_limit:
                sleep_time = 1.0 - (now - self._request_times[0])
                if sleep_time > 0:
                    await asyncio.sleep(sleep_time)
            
            self._request_times.append(now)
    
    async def fetch_klines_batch(
        self,
        exchange: str,
        symbol: str,
        timeframe: str,
        start_time: datetime,
        end_time: datetime
    ) -> List[KLineRecord]:
        """
        Lấy dữ liệu K-line theo batch với pagination
        
        Args:
            exchange: Tên sàn giao dịch
            symbol: Cặp tiền (VD: BTC-USDT)
            timeframe: Khung thời gian (VD: 1m, 5m, 1h)
            start_time: Thời gian bắt đầu
            end_time: Thời gian kết thúc
        
        Returns:
            Danh sách KLineRecord
        """
        records = []
        current_start = start_time
        
        while current_start < end_time:
            async with self._semaphore:
                await self._rate_limit()
                
                batch_end = min(
                    current_start + timedelta(days=7),
                    end_time
                )
                
                try:
                    data = await self._fetch_single_batch(
                        exchange, symbol, timeframe,
                        current_start, batch_end
                    )
                    records.extend(data)
                    
                    self.metrics["total_requests"] += 1
                    self.metrics["total_latency_ms"] += data.get("_latency_ms", 0)
                    
                except Exception as e:
                    self.metrics["failed_requests"] += 1
                    print(f"Lỗi fetch batch: {e}")
                
                current_start = batch_end
        
        return records
    
    async def _fetch_single_batch(
        self,
        exchange: str,
        symbol: str,
        timeframe: str,
        start: datetime,
        end: datetime
    ) -> List[Dict]:
        """Fetch một batch dữ liệu"""
        
        params = {
            "exchange": exchange,
            "symbol": symbol,
            "timeframe": timeframe,
            "from": int(start.timestamp() * 1000),
            "to": int(end.timestamp() * 1000),
            "limit": 1000,
        }
        
        start_ts = asyncio.get_event_loop().time()
        
        async with self._session.get(
            f"{self.base_url}/klines",
            params=params
        ) as response:
            response.raise_for_status()
            data = await response.json()
        
        latency_ms = (asyncio.get_event_loop().time() - start_ts) * 1000
        data["_latency_ms"] = latency_ms
        
        return [
            KLineRecord(
                timestamp=datetime.fromtimestamp(r["timestamp"] / 1000),
                open=float(r["open"]),
                high=float(r["high"]),
                low=float(r["low"]),
                close=float(r["close"]),
                volume=float(r["volume"]),
            )
            for r in data.get("data", [])
        ]
    
    def get_performance_report(self) -> Dict:
        """Lấy báo cáo hiệu suất"""
        avg_latency = (
            self.metrics["total_latency_ms"] / self.metrics["total_requests"]
            if self.metrics["total_requests"] > 0 else 0
        )
        
        success_rate = (
            (self.metrics["total_requests"] - self.metrics["failed_requests"])
            / self.metrics["total_requests"] * 100
            if self.metrics["total_requests"] > 0 else 0
        )
        
        return {
            "total_requests": self.metrics["total_requests"],
            "failed_requests": self.metrics["failed_requests"],
            "success_rate": f"{success_rate:.2f}%",
            "average_latency_ms": f"{avg_latency:.2f}ms",
            "cache_hits": self.metrics["cache_hits"],
        }

Benchmark async collector

async def benchmark_async_collector(): """Benchmark hiệu suất của async collector""" import time collector = AsyncKLineCollector( api_key=TARDIS_API_KEY, max_concurrent=20, rate_limit=100 ) test_pairs = [ ("binance", "BTC-USDT", "1m"), ("binance", "ETH-USDT", "1m"), ("bybit", "BTC-USDT", "1m"), ("okx", "ETH-USDT", "1m"), ] start_time = time.time() async with collector.session(): tasks = [ collector.fetch_klines_batch( ex, sym, tf, datetime.now() - timedelta(hours=1), datetime.now() ) for ex, sym, tf in test_pairs ] results = await asyncio.gather(*tasks) total_time = time.time() - start_time print(f"Tổng thời gian: {total_time:.2f}s") print(f"Tổng records: {sum(len(r) for r in results)}") print(f"Performance: {collector.get_performance_report()}")

Chạy benchmark

asyncio.run(benchmark_async_collector())

Xử lý dữ liệu lớn với Polars và Memory Optimization

Với dataset hàng triệu rows, Pandas trở nên chậm và tiêu tốn nhiều RAM. Tôi đã chuyển sang Polars và đạt được cải thiện đáng kể.

import polars as pl
from typing import Optional
import numpy as np

class KLineDataProcessor:
    """
    Xử lý dữ liệu K-line với Polars để tối ưu memory và speed
    """
    
    def __init__(self, df: pd.DataFrame):
        # Chuyển đổi sang Polars ngay lập tức
        self.df = pl.DataFrame({
            "timestamp": df.index,
            "open": df["open"].astype(np.float32),  # Float32 thay vì Float64
            "high": df["high"].astype(np.float32),
            "low": df["low"].astype(np.float32),
            "close": df["close"].astype(np.float32),
            "volume": df["volume"].astype(np.float32),
        })
    
    def calculate_indicators(self) -> pl.DataFrame:
        """Tính toán các chỉ báo kỹ thuật với Polars expressions"""
        
        return self.df.with_columns([
            # Moving Averages
            pl.col("close").rolling_mean(window_size=7).alias("ma7"),
            pl.col("close").rolling_mean(window_size=25).alias("ma25"),
            pl.col("close").rolling_mean(window_size=99).alias("ma99"),
            
            # Bollinger Bands
            pl.col("close").rolling_mean(window_size=20).alias("bb_middle"),
            (pl.col("close").rolling_std(window_size=20) * 2).alias("bb_std"),
            
            # RSI
            self._calculate_rsi(pl.col("close"), 14).alias("rsi"),
            
            # ATR (Average True Range)
            self._calculate_atr(14).alias("atr"),
            
            # Returns
            pl.col("close").pct_change().alias("returns"),
            pl.col("close").pct_change(periods=7).alias("returns_7d"),
            
            # Volatility
            pl.col("close").rolling_std(window_size=20).alias("volatility"),
        ])
    
    @staticmethod
    def _calculate_rsi(close: pl.Expr, period: int) -> pl.Expr:
        """Tính RSI sử dụng Polars expressions"""
        delta = close.diff()
        
        gain = delta.clip(lower_bound=0)
        loss = (-delta.clip(upper_bound=0))
        
        avg_gain = gain.rolling_mean(window_size=period)
        avg_loss = loss.rolling_mean(window_size=period)
        
        rs = avg_gain / avg_loss
        rsi = 100 - (100 / (1 + rs))
        
        return rsi
    
    def _calculate_atr(self, period: int) -> pl.Expr:
        """Tính ATR (Average True Range)"""
        high_low = pl.col("high") - pl.col("low")
        high_close = (pl.col("high") - pl.col("close").shift(1)).abs()
        low_close = (pl.col("low") - pl.col("close").shift(1)).abs()
        
        true_range = pl.concat([high_low, high_close, low_close]).max()
        
        return true_range.rolling_mean(window_size=period)
    
    def resample_timeframe(
        self, 
        timeframe: str = "1H"
    ) -> pl.DataFrame:
        """
        Resample dữ liệu sang timeframe khác
        
        Args:
            timeframe: Chuỗi timeframe (1H, 4H, 1D, v.v.)
        
        Returns:
            DataFrame đã resampled
        """
        return self.df.group_by_dynamic(
            "timestamp",
            every=timeframe,
        ).agg([
            pl.col("open").first(),
            pl.col("high").max(),
            pl.col("low").min(),
            pl.col("close").last(),
            pl.col("volume").sum(),
        ])
    
    def detect_patterns(self) -> pl.DataFrame:
        """Phát hiện các mẫu nến cơ bản"""
        
        df = self.df.with_columns([
            # Doji
            ((pl.col("open") - pl.col("close")).abs() / 
             (pl.col("high") - pl.col("low")) < 0.1).alias("is_doji"),
            
            # Hammer
            (
                (pl.col("high") - pl.col("low") > 2 * (pl.col("open") - pl.col("close")).abs()) &
                ((pl.col("high") - pl.col("close")).abs() < 0.1 * (pl.col("high") - pl.col("low")))
            ).alias("is_hammer"),
            
            # Engulfing
            (
                (pl.col("close") > pl.col("open").shift(1)) &
                (pl.col("open") < pl.col("close").shift(1)) &
                (pl.col("close").diff() > 0)
            ).alias("is_bullish_engulfing"),
        ])
        
        return df
    
    def export_parquet(self, path: str, compression: str = "zstd"):
        """
        Export sang Parquet với compression tối ưu
        
        Args:
            path: Đường dẫn file
            compression: Thuật toán nén (zstd, lz4, snappy)
        """
        self.df.write_parquet(path, compression=compression)
    
    @staticmethod
    def read_parquet(path: str) -> pl.DataFrame:
        """Đọc file Parquet với streaming cho file lớn"""
        return pl.scan_parquet(path)

Benchmark: Pandas vs Polars

def benchmark_pandas_vs_polars(): """So sánh hiệu suất Pandas và Polars""" import time # Tạo dataset lớn (1 triệu rows) n = 1_000_000 dates = pd.date_range("2020-01-01", periods=n, freq="1min") df = pd.DataFrame({ "timestamp": dates, "open": np.random.randn(n).cumsum() + 100, "high": np.random.randn(n).cumsum() + 102, "low": np.random.randn(n).cumsum() + 98, "close": np.random.randn(n).cumsum() + 100, "volume": np.random.rand(n) * 1000, }) df["high"] = df[["open", "high", "close"]].max(axis=1) df["low"] = df[["open", "low", "close"]].min(axis=1) # Benchmark Pandas start = time.time() df_pandas = df.copy() df_pandas["ma7"] = df_pandas["close"].rolling(7).mean() df_pandas["ma25"] = df_pandas["close"].rolling(25).mean() pandas_time = time.time() - start pandas_memory = df_pandas.memory_usage(deep=True).sum() / 1024**2 # Benchmark Polars start = time.time() processor = KLineDataProcessor(df) df_polars = processor.calculate_indicators() polars_time = time.time() - start polars_memory = df_polars.estimated_size() / 1024**2 print(f"Pandas: {pandas_time:.2f}s, Memory: {pandas_memory:.2f}MB") print(f"Polars: {polars_time:.2f}s, Memory: {polars_memory:.2f}MB") print(f"Speed improvement: {pandas_time/polars_time:.1f}x") print(f"Memory reduction: {pandas_memory/polars_memory:.1f}x")

benchmark_pandas_vs_polars()

Memory Optimization và Chunked Processing

Đối với dataset cực lớn (hàng chục GB), việc xử lý theo chunks là bắt buộc. Dưới đây là pattern tôi sử dụng trong production.

import psutil
from pathlib import Path
from typing import Iterator, Callable
import gc

class ChunkedKLineProcessor:
    """
    Xử lý dữ liệu K-line theo chunks để tiết kiệm memory
    """
    
    CHUNK_SIZE = 100_000  # Rows per chunk
    
    def __init__(self, chunk_size: int = CHUNK_SIZE):
        self.chunk_size = chunk_size
        self.processed_chunks = 0
    
    def process_large_file(
        self,
        input_path: str,
        output_path: str,
        transform_func: Callable[[pl.DataFrame], pl.DataFrame]
    ):
        """
        Xử lý file lớn theo chunks và ghi ra file mới
        
        Args:
            input_path: Đường dẫn file input
            output_path: Đường dẫn file output
            transform_func: Hàm transform cho mỗi chunk
        """
        # Kiểm tra memory trước khi xử lý
        available_memory = psutil.virtual_memory().available / 1024**3
        print(f"Available memory: {available_memory:.2f} GB")
        
        # Sử dụng scan_parquet cho lazy evaluation
        lazy_df = pl.scan_parquet(input_path)
        
        # Xử lý theo batches
        offset = 0
        first_chunk = True
        
        while True:
            # Đọc chunk với giới hạn
            chunk = lazy_df.slice(offset, self.chunk_size).collect()
            
            if chunk.is_empty():
                break
            
            # Transform chunk
            transformed = transform_func(chunk)
            
            # Ghi append vào file output
            mode = "overwrite" if first_chunk else "append"
            transformed.write_parquet(
                output_path,
                write_mode=mode
            )
            first_chunk = False
            
            offset += self.chunk_size
            self.processed_chunks += 1
            
            # Force garbage collection sau mỗi chunk
            gc.collect()
            
            # Log progress
            print(f"Processed chunk {self.processed_chunks}: "
                  f"{len(chunk)} rows, "
                  f"Memory: {psutil.virtual_memory().percent}%")
    
    def streaming_analysis(
        self,
        file_path: str,
        analysis_func: Callable[[pl.DataFrame], dict]
    ) -> Iterator[dict]:
        """
        Phân tích dữ liệu theo streaming
        
        Args:
            file_path: Đường dẫn file
            analysis_func: Hàm phân tích cho mỗi chunk
        
        Yields:
            Kết quả phân tích cho mỗi chunk
        """
        for chunk in pl.scan_parquet(file_path).iter_slices(n_rows=self.chunk_size):
            result = analysis_func(chunk)
            yield result
            del chunk
            gc.collect()

Ví dụ sử dụng

def calculate_chunk_statistics(chunk: pl.DataFrame) -> dict: """Tính thống kê cho một chunk""" return { "count": len(chunk), "mean_close": chunk["close"].mean(), "max_close": chunk["close"].max(), "min_close": chunk["close"].min(), "total_volume": chunk["volume"].sum(), }

Sử dụng

processor = ChunkedKLineProcessor(chunk_size=50_000)

#

# Xử lý file 10GB

processor.process_large_file(

input_path="data/klines_raw.parquet",

output_path="data/klines_processed.parquet",

transform_func=lambda df: KLineDataProcessor(

df.to_pandas()

).calculate_indicators().to_pandas()

)

#

# Hoặc streaming analysis

for stats in processor.streaming_analysis(

"data/klines_processed.parquet",

calculate_chunk_statistics

):

print(stats)

Triển khai Production với Docker và Monitoring

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

Cài đặt dependencies

COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt

Copy code

COPY . .

Tạo non-root user

RUN useradd -m appuser && chown -R appuser:appuser /app USER appuser

Environment variables

ENV PYTHONUNBUFFERED=1 ENV TARDIS_API_KEY=${TARDIS_API_KEY}

Health check

HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD python healthcheck.py

Default command

CMD ["python", "main.py"]
# docker-compose.yml cho production deployment
version: '3.8'

services:
  kline-visualizer:
    build: .
    container_name: kline_visualizer
    restart: unless-stopped
    ports:
      - "8000:8000"
    environment:
      - TARDIS_API_KEY=${TARDIS_API_KEY}
      - REDIS_URL=redis://redis:6379/0
      - LOG_LEVEL=INFO
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    depends_on:
      - redis
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 4G
        reservations:
          cpus: '0.5'