Skip to content

drift-cataloger Prefect Flows#

The Drift Cataloger project pulls the latest data from the Drift API and stores it in a database. This data is then used downstream to help assist with trading positions on the Drift Protocol.

drift_cataloger.load_drift_api #

load_drift_api(date_: date | list[date] | None = None, engine: Engine | None = None) -> None

Pull funding rate data from the Drift API and publish it to the database.

This function initializes the Drift database if it is not already initialized, then fetches and publishes funding rate data for the specified symbols and dates. This calls publish_funding_rate_data for each symbol and date combination on different threads to speed up the process.

Parameters:

Name Type Description Default
date_ date | list[date] | None

The specific date or list of dates to fetch funding rates for. If None, fetches all available funding rates for the symbols from the last 31 days. Defaults to None.

None
engine Engine | None

SQLAlchemy engine, if not provided, the default engine will be used. Defaults to None.

None
Source code in drift_cataloger/data_api.py
@flow(  # pyright: ignore[reportCallIssue]
    name="drift-api-loader",
    task_runner=ThreadPoolTaskRunner(max_workers=8),  # pyright: ignore[reportArgumentType]
    on_failure=[SLACK_ASSISTANT.flow_status_hook()],
    on_crashed=[SLACK_ASSISTANT.flow_status_hook()],
)
def load_drift_api(
    date_: date | list[date] | None = None,
    engine: Engine | None = None,
) -> None:
    """Pull funding rate data from the Drift API and publish it to the database.

    This function initializes the Drift database if it is not already initialized, then
    fetches and publishes funding rate data for the specified symbols and dates. This
    calls `publish_funding_rate_data` for each symbol and date combination on different
    threads to speed up the process.

    Args:
        date_ (date | list[date] | None, optional): The specific date or list of dates
            to fetch funding rates for. If None, fetches all available funding rates for
            the symbols from the last 31 days. Defaults to None.
        engine (Engine | None, optional): SQLAlchemy engine, if not provided, the
            default engine will be used. Defaults to None.
    """
    enable_loguru_support()
    engine = resolve_engine(engine)
    initialize_db(engine=engine)

    symbols = ["SOL-PERP", "BTC-PERP", "ETH-PERP"]
    dates_ = date_ if isinstance(date_, list) else [date_]
    wait(
        [
            publish_funding_rate_data.submit(symbol=s, date_=d, engine=engine)
            for s in symbols
            for d in dates_
        ],
    )

    logger.success("Drift API data loading completed.")