Create a funding rate arbitrage bot for perpetual exchanges.

on 2025-11-07

Introduction

Strategy that it uses

  • sort and get all possible arb opportunities
  • find the best one, if it exceeds the threshold, open the position
  • keep the position open untill the arb is valid,
  • if you find a crazy arbitrage opportunity, switch to it
  • otherwise keep the position open and wait for the next arb opportunity
  • if the ROI is not enough, close the position
  • watch and wait for the next arb opportunity

Note

  • I know the code is ass, it should be using some form of adapter pattern, or plugin system to support arbitrary exchanges but.
    • The integration of all exchanges in a common interface was a pain and I didn't wanna spend more time on it.
    • I was picking up one exchange then playing w it, I had the thought of having common interface but it wasn't a priority.

Computing the arbitrage opportunity

exchanges = ["extended", "hyperliquid", "hibachi", "lighter", "pacifica"]

@dataclass
class MarketInfo:
    name: str
    market_name: str
    funding_rate: float
    next_funding_time: int
    maxLeverage: int
    exchange: Literal[exchanges]

Get data from all sources

def combined_funding_rates():
    markets = []
    for exchange in exchanges:
        markets.extend([x.__dict__ for x in FUNDING_FETCHER[exchange]()])

    df = pd.DataFrame(markets)

    # Set index and unstack
    result = df.set_index(["name", "exchange"])[
        ["funding_rate", "next_funding_time", "maxLeverage", "market_name"]
    ].unstack("exchange")

    # Flatten column names: (funding_rate, extended) -> funding_rate_extended
    result.columns = [f"{col}_{ex}" for col, ex in result.columns]

    return result.reset_index()


def get_arbs_df():
    df = combined_funding_rates()
    arb_df = find_funding_arbs(df)
    return arb_df

Compute the arbitrage opportunity

def find_funding_arbs(df):
    """
    Find arbitrage opportunities between any two exchanges where funding rates have opposite signs.

    Returns DataFrame with columns:
    - name: asset name
    - exchange_pair: tuple of (exchange1, exchange2)
    - funding_rate_ex1: funding rate of exchange1
    - funding_rate_ex2: funding rate of exchange2
    - funding_spread: absolute difference between funding rates
    """
    arbs = []

    # Get all funding_rate columns
    funding_cols = [col for col in df.columns if col.startswith("funding_rate_")]
    exchange_names = [col.replace("funding_rate_", "") for col in funding_cols]

    # For each row (asset)
    for idx, row in df.iterrows():
        name = row["name"]

        # For each pair of exchanges
        for i, ex1 in enumerate(exchange_names):
            for j, ex2 in enumerate(exchange_names):
                if i >= j:  # Avoid duplicates (ex1, ex2) and (ex2, ex1)
                    continue

                col1 = f"funding_rate_{ex1}"
                col2 = f"funding_rate_{ex2}"

                rate1 = row[col1]
                rate2 = row[col2]

                # Skip if either is NaN
                if pd.isna(rate1) or pd.isna(rate2):
                    continue

                # Check if they have opposite signs
                if (rate1 > 0 and rate2 < 0) or (rate1 < 0 and rate2 > 0):
                    spread = abs(rate1 - rate2)
                    arbs.append(
                        {
                            "name": name,
                            "exchanges": f"{ex1}-{ex2}",  # or use tuple: (ex1, ex2)
                            "exchange1": ex1,
                            "exchange2": ex2,
                            "funding_rate_ex1": rate1,
                            "funding_rate_ex2": rate2,
                            "funding_spread": spread,
                        }
                    )

    return pd.DataFrame(arbs).sort_values("funding_spread", ascending=False)

Code

class FundingArbBot:
    config: FundingArbBotConfig

    def __init__(self, config: FundingArbBotConfig):
        self.config = config
        self.current_arb_position = None
        self.extended = ExtendedExchange()
        self.hyperliquid = HyperliquidExchange(config=create_strategy_config_from_env())
        self.current_arb = None

    async def initialize(self):
        await self.extended.initialize()

    async def place_market_order(
        self,
        exchange: Literal[exchanges],
        market_name: str,
        side: Literal["LONG", "SHORT"],
        amount: float,
        reduce_only: bool = False,
    ) -> Optional[str]:
        if exchange == "extended":
            return await self.extended.place_market_order_retry(
                market_name, side, amount, reduce_only, self.config.slippage, 10
            )
        elif exchange == "hyperliquid":
            return self.hyperliquid.place_market_order_retry(
                market_name, side, amount, reduce_only, self.config.slippage, 10
            )
        else:
            raise ValueError(f"Unsupported exchange: {exchange}")

    async def set_leverage(
        self, exchange: Literal[exchanges], market_name: str, leverage: int
    ):
        if exchange == "extended":
            return await self.extended.set_acc_leverage(market_name, leverage)
        elif exchange == "hyperliquid":
            return self.hyperliquid.set_acc_leverage(market_name, leverage)
        else:
            raise ValueError(f"Unsupported exchange: {exchange}")

    async def get_max_leverage(
        self, exchange: Literal[exchanges], market_name: str
    ) -> int:
        if exchange == "extended":
            return await self.extended.get_max_market_leverage(market_name)
        elif exchange == "hyperliquid":
            return self.hyperliquid.get_max_market_leverage(market_name)
        else:
            raise ValueError(f"Unsupported exchange: {exchange}")

    async def get_price(
        self, exchange: Literal[exchanges], symbol: str, side: Literal["LONG", "SHORT"]
    ) -> float:
        if exchange == "extended":
            return await self.extended.get_price(symbol, side)
        elif exchange == "hyperliquid":
            return self.hyperliquid.get_price(symbol)
        else:
            raise ValueError(f"Unsupported exchange: {exchange}")

    async def get_acc_balance(self, exchange: Literal[exchanges]) -> float:
        if exchange == "extended":
            return await self.extended.get_acc_balance()
        elif exchange == "hyperliquid":
            return self.hyperliquid.get_acc_balance()
        else:
            raise ValueError(f"Unsupported exchange: {exchange}")

    async def get_qty(
        self, exchange: Literal[exchanges], symbol: str, amount: float
    ) -> float:
        if exchange == "extended":
            return await self.extended.get_qty(symbol, amount)
        elif exchange == "hyperliquid":
            return self.hyperliquid.get_qty(symbol, amount)
        else:
            raise ValueError(f"Unsupported exchange: {exchange}")

    async def get_active_position(
        self, exchange: Literal[exchanges], symbol: str
    ) -> Optional[ActivePosition]:
        """Get active position for a symbol on the specified exchange."""
        if exchange == "extended":
            return await self.extended.get_active_position(symbol)
        elif exchange == "hyperliquid":
            return self.hyperliquid.get_active_position(symbol)
        else:
            raise ValueError(f"Unsupported exchange: {exchange}")

    async def open_position(self, best_arb: Dict[str, Any], reduce_only: bool = False):
        long_exchange = None
        short_exchange = None

        if best_arb["funding_rate_ex1"] > 0:
            short_exchange = best_arb["exchange1"]
            long_exchange = best_arb["exchange2"]
        else:
            short_exchange = best_arb["exchange2"]
            long_exchange = best_arb["exchange1"]

        long_acc_balance = await self.get_acc_balance(long_exchange)
        short_acc_balance = await self.get_acc_balance(short_exchange)
        if (
            long_acc_balance < self.config.amount_usd
            or short_acc_balance < self.config.amount_usd
        ):
            error_msg = (
                f"not enough balance in LONG:{long_exchange} or SHORT:{short_exchange}"
            )
            logging.error(error_msg)
            logging.error(
                f"long balance: {long_acc_balance}, short balance: {short_acc_balance}"
            )
            send_macos_notification(
                title="Tiny HFT - Insufficient Balance",
                message=f"{error_msg}. Long: ${long_acc_balance:.2f}, Short: ${short_acc_balance:.2f}",
            )
            raise Exception(error_msg)

        # set max leverage first
        max1 = await self.get_max_leverage(long_exchange, best_arb["name"])
        max2 = await self.get_max_leverage(short_exchange, best_arb["name"])
        logging.info(
            f"max leverage for {best_arb['name']} on {long_exchange}: {max1}, on {short_exchange}: {max2}"
        )
        max_leverage = min(max1, max2)
        logging.info(
            f"setting max leverage to {max_leverage} for {best_arb['name']} on {long_exchange} and {short_exchange}"
        )
        await self.set_leverage(long_exchange, best_arb["name"], max_leverage)
        await self.set_leverage(short_exchange, best_arb["name"], max_leverage)
        logging.info(
            f"max leverage set for {best_arb['name']} on {long_exchange} and {short_exchange}"
        )

        # compute quantity
        long_price = float(
            await self.get_price(long_exchange, best_arb["name"], "LONG")
        )
        logging.info(f"current price is {long_price}")
        logging.info(f"current symbol is {best_arb['name']}")

        qty = round(self.config.amount_usd * max_leverage / long_price, 1)
        if long_price < 10:
            qty = ceil(qty)
        qty_1 = float(await self.get_qty(long_exchange, best_arb["name"], qty))
        qty_2 = float(await self.get_qty(short_exchange, best_arb["name"], qty))
        qty = float(min(qty_1, qty_2))
        self.last_qty = float(qty)

        if reduce_only:
            qty = self.last_qty

        logging.info(
            f"{"opening" if reduce_only == False else "closing"} position with qty: {qty}, value: {qty * long_price}$"
        )

        logging.info(
            f"long exchange: {long_exchange}, short exchange: {short_exchange}"
        )

        async def long_t():
            # open long position
            long_order_id = await self.place_market_order(
                long_exchange,
                best_arb["name"],
                "LONG" if reduce_only == False else "SHORT",
                qty,
                reduce_only,
            )
            if long_order_id is None:
                error_msg = f"Failed to open long position on {long_exchange}"
                logging.error(error_msg)
                send_macos_notification(
                    title="Tiny HFT - Order Failed",
                    message=f"{error_msg} for {best_arb['name']}",
                )
                raise Exception(error_msg)

        async def short_t():
            short_order_id = await self.place_market_order(
                short_exchange,
                best_arb["name"],
                "SHORT" if reduce_only == False else "LONG",
                qty,
                reduce_only,
            )
            if short_order_id is None:
                error_msg = f"Failed to open short position on {short_exchange}"
                logging.error(error_msg)
                send_macos_notification(
                    title="Tiny HFT - Order Failed",
                    message=f"{error_msg} for {best_arb['name']}",
                )
                raise Exception(error_msg)

        long_task = asyncio.create_task(long_t())
        short_task = asyncio.create_task(short_t())
        try:
            await asyncio.gather(long_task, short_task)
        except Exception as e:
            error_msg = f"Error during position {'opening' if not reduce_only else 'closing'}: {str(e)}"
            logging.error(error_msg)
            send_macos_notification(
                title="Tiny HFT - Position Error",
                message=f"{error_msg} for {best_arb['name']}",
            )
            raise

    async def close_position(self, best_arb: Dict[str, Any]):
        """Close position with the same logic as open_position but always with reduce_only=True"""
        await self.open_position(best_arb, reduce_only=True)

    def infer_positions(
        self,
        position1: ActivePosition,
        position2: ActivePosition,
        best_arb: Dict[str, Any],
    ) -> bool:
        """
        Check if positions match the best_arb by comparing symbol, size, and exchange.

        Args:
            position1: Active position from exchange1 (must not be None)
            position2: Active position from exchange2 (must not be None)
            best_arb: The best arbitrage opportunity dict (contains exchange1 and exchange2)

        Returns:
            True if positions match the best_arb, False otherwise
        """
        exchange1 = best_arb["exchange1"]
        exchange2 = best_arb["exchange2"]

        # Check if symbols match
        if position1.symbol != best_arb["name"] or position2.symbol != best_arb["name"]:
            logging.info(
                f"Symbol mismatch: positions {position1.symbol}, {position2.symbol} != {best_arb['name']}"
            )
            return False

        # Check if sizes are approximately equal (within 5% tolerance)
        size1 = position1.size
        size2 = position2.size

        if size1 == 0 or size2 == 0:
            logging.info(f"One or both positions have zero size")
            return False

        # Size should be approximately equal for arbitrage
        size_diff = abs(size1 - size2) / max(size1, size2)
        if size_diff > 0.05:  # 5% tolerance
            logging.info(f"Size mismatch: {size1} vs {size2}, diff: {size_diff:.2%}")
            return False

        logging.info(
            f"Positions match best_arb: symbol={best_arb['name']}, exchanges={exchange1}/{exchange2}, sizes={size1:.2f}/{size2:.2f}"
        )
        return True

    def filter_and_sort_arbs(self, records_df: pd.DataFrame) -> pd.DataFrame:
        """
        Filter arbitrage records by configured exchanges and sort by funding spread.

        Args:
            records_df: DataFrame containing arbitrage records

        Returns:
            Filtered and sorted DataFrame
        """
        filtered_df = records_df[
            records_df["exchange1"].isin(self.config.exchanges)
            & records_df["exchange2"].isin(self.config.exchanges)
        ]
        return filtered_df.sort_values("funding_spread", ascending=False)

    async def init_try_infer_position_or_close(self):
        """
        Try to infer existing positions at startup. If positions are found:
        - If only one position exists on one exchange, close it
        - If both positions exist and match the best arb, set current_arb
        This runs only once at startup before the main loop.
        """
        try:
            records = get_cached_arbs()
            if records is None:
                return

            records_df = pd.DataFrame(records)
            records_df_filtered = self.filter_and_sort_arbs(records_df)

            if records_df_filtered.empty:
                return

            best_arb_candidate = records_df_filtered.iloc[0]
            symbol = best_arb_candidate["name"]

            # Get positions from both exchanges
            exchange1_name = best_arb_candidate["exchange1"]
            exchange2_name = best_arb_candidate["exchange2"]
            position1 = await self.get_active_position(exchange1_name, symbol)
            position2 = await self.get_active_position(exchange2_name, symbol)

            # Handle case where position is open only on one exchange
            if position1 is not None and position2 is None:
                logging.info(f"Position found only on {exchange1_name}, closing it")
                # Use exchange-specific close method
                if exchange1_name == "hyperliquid":
                    self.hyperliquid.close_order(symbol)
                elif exchange1_name == "extended":
                    await self.extended.close_current_opened_position(
                        position1.market_name
                    )
                else:
                    logging.error(f"Unsupported exchange for closing: {exchange1_name}")
            elif position1 is None and position2 is not None:
                logging.info(f"Position found only on {exchange2_name}, closing it")
                # Use exchange-specific close method
                if exchange2_name == "hyperliquid":
                    self.hyperliquid.close_order(symbol)
                elif exchange2_name == "extended":
                    await self.extended.close_current_opened_position(
                        position2.market_name
                    )
                else:
                    logging.error(f"Unsupported exchange for closing: {exchange2_name}")
            elif position1 is not None and position2 is not None:
                # Both positions exist, check if they match best_arb
                best_arb_dict = best_arb_candidate.to_dict()
                if self.infer_positions(position1, position2, best_arb_dict):
                    logging.info(
                        f"Inferred existing position matches best_arb: {symbol}"
                    )
                    self.current_arb = best_arb_candidate
        except Exception as e:
            logging.warning(f"Error inferring positions: {e}")
            # Continue to normal flow if inference fails

    async def run(self):
        # Try to infer existing positions first (run only once at startup)
        await self.init_try_infer_position_or_close()

        while True:
            try:
                await asyncio.sleep(10)
                records = get_cached_arbs()
                if records is None:
                    logging.error("No arbs found")
                    continue

                records_df = pd.DataFrame(records)

                # if there is no current open position open at the best arb
                if self.current_arb is None:
                    # filtereed by exchanges from config
                    records_df = self.filter_and_sort_arbs(records_df)
                    if records_df.empty:
                        logging.info("no arbs found for our needed exchanges")
                        continue

                    best_arb = records_df.iloc[0]

                    yearly_roi = best_arb["funding_spread"] * 365 * 24 * 100
                    logging.info(
                        f"NEW: {best_arb['name']} with yearly roi: {yearly_roi}%"
                    )

                    if yearly_roi < self.config.min_yearly_roi:
                        logging.info(
                            f"yearly ROI {yearly_roi}% is not enough, no new position, min ROI ({self.config.min_yearly_roi}%)"
                        )
                        continue

                    await self.open_position(best_arb)
                    self.current_arb = best_arb

                # otherwise, check if the arb is still valid, if not then close the position and unset the current arb
                else:
                    last_arb = self.current_arb
                    records_df = self.filter_and_sort_arbs(records_df)
                    if records_df.empty:
                        logging.info("no arbs found for our needed exchanges")
                        continue

                    current_best_arb = records_df.iloc[0]
                    current_best_yearly_roi = (
                        current_best_arb["funding_spread"] * 365 * 24 * 100
                    )
                    logging.info(
                        f"NEW: {current_best_arb['name']} with yearly roi: {current_best_yearly_roi}%"
                    )

                    # if you find the latest best arb to cross the `switch_when` threshold, then switch to it
                    if current_best_yearly_roi > self.config.switch_when:
                        logging.info(
                            f"NEW: yearly ROI is greater than the switch when {self.config.switch_when}%, switching to the current best arb"
                        )
                        # close the current position
                        await self.close_position(current_best_arb)
                        self.current_arb = current_best_arb
                        continue

                    get_last_arb_info = records_df[
                        records_df["name"].isin([last_arb["name"]])
                        & records_df["exchange1"].isin([last_arb["exchange1"]])
                        & records_df["exchange2"].isin([last_arb["exchange2"]])
                    ]

                    if get_last_arb_info.empty:
                        logging.info("last arb is no longer valid")
                        await self.close_position(last_arb)
                        self.current_arb = None
                        continue

                    best_arb = get_last_arb_info.iloc[0]
                    yearly_roi = best_arb["funding_spread"] * 365 * 24 * 100
                    logging.info(
                        f"CURRENT: {best_arb['name']} with yearly roi: {yearly_roi}%"
                    )

                    if yearly_roi < self.config.min_yearly_roi:
                        logging.info(
                            f"CURRENT: yearly ROI i.e {self.config.min_yearly_roi}%, it too low, closing position"
                        )
                        await self.close_position(last_arb)
                        self.current_arb = None
                        continue
                    else:
                        logging.info(
                            f"CURRENT yearly ROI is still valid, currently it's {yearly_roi}%"
                        )
            except Exception as e:
                error_message = str(e)
                logging.error(f"Error in run: {error_message}")
                send_macos_notification(
                    title="Tiny HFT - Bot Error",
                    message=f"Funding arb bot error: {error_message}",
                )
                raise e


# gunicorn --bind 0.0.0.0:8001 --workers 1 --threads 4 --worker-class sync strategies.funding_arb:app --reload
async def main():
    # Pre-populate cache on startup
    update_combined_rates_cache()
    update_arbs_cache()
    print("pre-populated cache")
    config = FundingArbBotConfig(
        exchanges=["hyperliquid", "extended"],
        amount_usd=100,
        slippage=0.01,
        min_yearly_roi=200,
        switch_when=1500,
    )

    logging.info(f"Config variables:")
    logging.info(f"exchanges: {', '.join(config.exchanges)}")
    logging.info(f"amount_usd: {config.amount_usd}$")
    logging.info(f"slippage: {config.slippage*100}%")
    logging.info(f"min_yearly_roi: {config.min_yearly_roi}%")
    logging.info(f"switch_when: {config.switch_when}%")

    bot = FundingArbBot(config=config)
    await bot.initialize()
    send_macos_notification(
        title="Tiny HFT - Bot Started",
        message=f"Funding arb bot has started successfully. Exchanges: {', '.join(config.exchanges)}",
    )
    await bot.run()


if __name__ == "__main__":
    asyncio.run(main())

It computes the arbitrage opportunities and caches them for 1 minute.

FUNDING_FETCHER: Dict[Literal[exchanges], Callable] = {
    "extended": get_markets_info_from_extended,
    "hibachi": get_markets_info_from_hibachi,
    "hyperliquid": get_markets_info_from_hyperliquid,
    "lighter": get_markets_info_from_lighter,
    "pacifica": get_markets_info_from_pacifica,
    # "variational": get_markets_info_from_variational,
}


app = Flask(
    __name__, template_folder=os.path.join(os.path.dirname(__file__), "templates")
)

# Cache configuration
CACHE_TTL_SECONDS = 1 * 60
cache_lock = threading.Lock()
data_cache = {
    "combined_rates": {
        "data": None,
        "timestamp": 0,
        "updating": False,
    },
    "arbs": {
        "data": None,
        "timestamp": 0,
        "updating": False,
    },
}


def convert_nan_to_none(value):
    """Convert NaN values to None for JSON serialization"""
    try:
        if pd.isna(value):
            return None
    except (TypeError, ValueError):
        # If pd.isna fails, just return the value as-is
        pass
    return value


def is_cache_stale(cache_entry: Dict[str, Any]) -> bool:
    """Check if cache entry is stale (older than TTL)"""
    if cache_entry["data"] is None:
        return True
    age = time.time() - cache_entry["timestamp"]
    return age >= CACHE_TTL_SECONDS


def update_combined_rates_cache():
    """Update the combined rates cache"""
    try:
        df = combined_funding_rates()
        if df.empty:
            records = []
        else:
            # Convert DataFrame to JSON (orient='records' gives a list of dictionaries)
            records = df.to_dict(orient="records")
            # Replace NaN values with None for JSON serialization
            records = [
                {k: convert_nan_to_none(v) for k, v in record.items()}
                for record in records
            ]

        with cache_lock:
            cache_entry = data_cache["combined_rates"]
            cache_entry["data"] = records
            cache_entry["timestamp"] = time.time()
            cache_entry["updating"] = False
    except Exception as e:
        import logging
        import traceback

        logging.error(f"Error updating combined_rates cache: {e}")
        logging.error(traceback.format_exc())


def update_arbs_cache():
    """Update the arbitrage opportunities cache"""
    try:
        df = get_arbs_df()
        if df.empty:
            records = []
        else:
            # Convert DataFrame to JSON (orient='records' gives a list of dictionaries)
            records = df.to_dict(orient="records")
            # Replace NaN values with None for JSON serialization
            records = [
                {k: convert_nan_to_none(v) for k, v in record.items()}
                for record in records
            ]

        with cache_lock:
            cache_entry = data_cache["arbs"]
            cache_entry["data"] = records
            cache_entry["timestamp"] = time.time()
            cache_entry["updating"] = False
    except Exception as e:
        import logging
        import traceback

        logging.error(f"Error updating arbs cache: {e}")
        logging.error(traceback.format_exc())


def get_cached_combined_rates():
    """Get combined rates from cache, updating if stale"""
    thread = None
    with cache_lock:
        cache_entry = data_cache["combined_rates"]
        stale = is_cache_stale(cache_entry)
        updating = cache_entry["updating"]
        old_data = cache_entry["data"]

        if stale and not updating:
            # Mark as updating and start background thread
            cache_entry["updating"] = True
            thread = threading.Thread(target=update_combined_rates_cache, daemon=True)
            thread.start()
            # Return old data if available, while update happens in background
            if old_data is not None:
                return old_data

    # If we get here, either cache is fresh or we're waiting for initial data
    if stale and old_data is None and thread is not None:
        # Wait for initial data from thread we just started
        thread.join()
        with cache_lock:
            return data_cache["combined_rates"]["data"]
    elif stale and old_data is None:
        # Another thread is updating, wait a bit and retry
        time.sleep(0.1)
        with cache_lock:
            return data_cache["combined_rates"]["data"]
    elif not stale:
        return old_data
    else:
        # Cache is updating, return old data if available
        return old_data


def get_cached_arbs():
    """Get arbitrage opportunities from cache, updating if stale"""
    thread = None
    with cache_lock:
        cache_entry = data_cache["arbs"]
        stale = is_cache_stale(cache_entry)
        updating = cache_entry["updating"]
        old_data = cache_entry["data"]

        if stale and not updating:
            # Mark as updating and start background thread
            cache_entry["updating"] = True
            thread = threading.Thread(target=update_arbs_cache, daemon=True)
            thread.start()
            # Return old data if available, while update happens in background
            if old_data is not None:
                return old_data

    # If we get here, either cache is fresh or we're waiting for initial data
    if stale and old_data is None and thread is not None:
        # Wait for initial data from thread we just started
        thread.join()
        with cache_lock:
            return data_cache["arbs"]["data"]
    elif stale and old_data is None:
        # Another thread is updating, wait a bit and retry
        time.sleep(0.1)
        with cache_lock:
            return data_cache["arbs"]["data"]
    elif not stale:
        return old_data
    else:
        # Cache is updating, return old data if available
        return old_data