Skip to content

drift-cataloger Prefect Flow Tasks#

drift_cataloger.get_funding_rate_records #

get_funding_rate_records(symbol: str, date_: date | None = None) -> list[FundingRateRecord]

Fetch funding rate records for a given symbol from the Drift API.

This function retrieves funding rate records for a specific trading pair symbol. If a specific date is provided, it fetches funding rates for that date; otherwise, it fetches all available funding rates for the symbol from the last 31 days. This is specified in the Drift API playground.

The entire list of supported symbols can be found on the GET /contracts endpoint, but the recommended symbols to use are SOL-PERP, BTC-PERP, and ETH-PERP.

Parameters:

Name Type Description Default
symbol str

The trading pair symbol to fetch funding rates for.

required
date_ date | None

The specific day to fetch funding rates for. If None, fetches all available funding rates for the symbol from the last 31 days. Defaults to None.

None

Returns:

Type Description
list[FundingRateRecord]

list[FundingRateRecord]: A list of funding rate records for the specified symbol and date.

Source code in drift_cataloger/data_api.py
@task(
    name="get-funding-rate-records",
    description="Fetch funding rate records from Drift API",
    cache_policy=MAGISTRATE_CACHE_POLICY,
    retries=10,
    retry_delay_seconds=exponential_backoff(2),
)
def get_funding_rate_records(
    symbol: str,
    date_: date | None = None,
) -> list[FundingRateRecord]:
    """Fetch funding rate records for a given symbol from the Drift API.

    This function retrieves funding rate records for a specific trading pair symbol. If
    a specific date is provided, it fetches funding rates for that date; otherwise, it
    fetches all available funding rates for the symbol from the last 31 days. This is
    specified in the [Drift API playground](https://data.api.drift.trade/playground).

    The entire list of supported symbols can be found on the [`GET /contracts`](https://data.api.drift.trade/contracts)
    endpoint, but the recommended symbols to use are `SOL-PERP`, `BTC-PERP`, and
    `ETH-PERP`.

    Args:
        symbol (str): The trading pair symbol to fetch funding rates for.
        date_ (date | None, optional): The specific day to fetch funding rates for.
            If None, fetches all available funding rates for the symbol from the last 31
            days. Defaults to None.

    Returns:
        list[FundingRateRecord]: A list of funding rate records for the specified
            symbol and date.
    """
    msg = f"Fetching funding rate records for symbol: `{symbol}`"
    if date_ is not None:
        msg += f" on date: `{date_}`"
    logger.info(msg)

    client = Client(cluster="mainnet")
    if date_ is not None:
        return client.get_funding_rates(
            symbol=symbol,
            year=date_.year,
            month=date_.month,
            day=date_.day,
        )
    rates = client.get_funding_rates(symbol=symbol)
    logger.info(f"Fetched {len(rates)} funding rate records for symbol: `{symbol}`")
    return rates

drift_cataloger.publish_funding_rates_to_db #

publish_funding_rates_to_db(records: Sequence[FundingRateRecord], engine: Engine | None = None) -> None

Publish funding rate records to the database, merging them with existing records.

Parameters:

Name Type Description Default
records Sequence[FundingRateRecord]

A sequence of funding rate records to publish to the database.

required
engine Engine | None

SQLAlchemy engine, if not provided, the default engine will be used. Defaults to None.

None
Source code in drift_cataloger/data_api.py
@task(
    name="publish-to-db",
    description="Publish records to the database",
    cache_policy=MAGISTRATE_CACHE_POLICY,
    retries=10,
    retry_delay_seconds=exponential_backoff(1),
)
def publish_funding_rates_to_db(
    records: Sequence[FundingRateRecord],
    engine: Engine | None = None,
) -> None:
    """Publish funding rate records to the database, merging them with existing records.

    Args:
        records (Sequence[FundingRateRecord]): A sequence of funding rate records to
            publish to the database.
        engine (Engine | None, optional): SQLAlchemy engine, if not provided, the
            default engine will be used. Defaults to None.
    """
    rates = [FundingRatesAPI.model_validate(r) for r in records]
    logger.info(f"Publishing {len(rates)} funding rate records to the database")
    with Session(resolve_engine(engine), expire_on_commit=True) as session:
        for rate in rates:
            session.merge(rate)
        session.commit()
    logger.success("Successfully published funding rate records to the database")

drift_cataloger.publish_funding_rate_data #

publish_funding_rate_data(symbol: str, date_: date | None = None, engine: Engine | None = None) -> None

Fetch and publish funding rate data for a specific symbol.

This function retrieves funding rate records for a given trading pair symbol from the Drift API. If a specific date is provided, it fetches funding rates for that date; otherwise, it fetches all available funding rates for the symbol from the last 31 days. The fetched records are then checked against the database to see if they have already been published. If not, the new records are published to the database.

Parameters:

Name Type Description Default
symbol str

The trading pair symbol to fetch funding rates for.

required
date_ date | None

The specific day to fetch funding rates for. If None, fetches all available funding rates for the symbol from the last 31 days. Defaults to None.

None
engine Engine | None

SQLAlchemy engine, if not provided, the default engine will be used. Defaults to None.

None
Source code in drift_cataloger/data_api.py
@task(
    name="fetch-latest-funding-rate-data",
    description="Fetch the latest funding rate data from Drift API",
    cache_policy=MAGISTRATE_CACHE_POLICY,
)
def publish_funding_rate_data(
    symbol: str,
    date_: date | None = None,
    engine: Engine | None = None,
) -> None:
    """Fetch and publish funding rate data for a specific symbol.

    This function retrieves funding rate records for a given trading pair symbol from
    the Drift API. If a specific date is provided, it fetches funding rates for that
    date; otherwise, it fetches all available funding rates for the symbol from the last
    31 days. The fetched records are then checked against the database to see if they
    have already been published. If not, the new records are published to the database.

    Args:
        symbol (str): The trading pair symbol to fetch funding rates for.
        date_ (date | None, optional): The specific day to fetch funding rates for.
            If None, fetches all available funding rates for the symbol from the last 31
            days. Defaults to None.
        engine (Engine | None, optional): SQLAlchemy engine, if not provided, the
            default engine will be used. Defaults to None.
    """
    engine = resolve_engine(engine)
    records = get_funding_rate_records(symbol=symbol, date_=date_)
    if len(records) == 0:
        msg = f"No funding rate records found for symbol: `{symbol}`"
        if date_ is not None:
            msg += f" on date: `{date_}`"
        logger.warning(msg)
        return
    publish_funding_rates_to_db(records=records, engine=engine)