Skip to content

wright-way Prefect Flow Tasks#

The Wright Way flows are built using the tasks below. These tasks do the bulk of the work, but still are relatively simple. They do things like initializing the database, CRUD operations on the database, pulling animal data from the website, and sending alerts when new animals are found.

wright_way.initialize_db #

initialize_db(engine: Engine | None = None) -> None

Initialize the Wright Way database, if not already initialized.

Parameters:

Name Type Description Default
engine Engine | None

SQLAlchemy engine, if not provided, the default engine will be used. Defaults to None.

None
Source code in wright_way/orchestration.py
@task(
    name="init-db",
    description="Initialize Wright Way DB, if not initialized",
    cache_policy=MAGISTRATE_CACHE_POLICY,
)
def initialize_db(engine: Engine | None = None) -> None:
    """Initialize the Wright Way database, if not already initialized.

    Args:
        engine (Engine | None, optional): SQLAlchemy engine, if not provided, the
            default engine will be used. Defaults to None.
    """
    logger.info("Initializing Wright Way DB, if not initialized")
    initialize_schemas(engine=engine)
    SQLModel.metadata.create_all(resolve_engine(engine))

wright_way.get_wright_way_db_ids #

get_wright_way_db_ids(engine: Engine | None = None) -> Sequence[int]

Get all Wright Way Petango IDs from the local database.

Parameters:

Name Type Description Default
engine Engine | None

SQLAlchemy engine, if not provided, the default engine will be used. Defaults to None.

None

Returns:

Type Description
Sequence[int]

Sequence[int]: A sequence of Wright Way Petango IDs in the database.

Source code in wright_way/orchestration.py
@task(
    name="pull-db-ids",
    description="Get all Wright Way Petango IDs from DB",
    cache_policy=MAGISTRATE_CACHE_POLICY,
)
def get_wright_way_db_ids(engine: Engine | None = None) -> Sequence[int]:
    """Get all Wright Way Petango IDs from the local database.

    Args:
        engine (Engine | None, optional): SQLAlchemy engine, if not provided, the
            default engine will be used. Defaults to None.

    Returns:
        Sequence[int]: A sequence of Wright Way Petango IDs in the database.
    """
    logger.info("Pulling all Wright Way Petango IDs from DB")
    with Session(resolve_engine(engine), expire_on_commit=False) as session:
        _ids = session.scalars(select(Animal.petango_id)).unique().all()
        logger.info(f"Found {len(_ids)} Petango IDs in DB")
        return _ids

wright_way.get_wright_way_ids #

get_wright_way_ids(client: WrightWayScraper | None = None) -> list[int]

Get all active Wright Way Petango IDs from the Wright Way website.

Parameters:

Name Type Description Default
client WrightWayScraper | None

Wright Way Scraper client, if not provided, a new client will be created. Defaults to None.

None

Returns:

Type Description
list[int]

list[int]: List of active Wright Way Petango IDs.

Source code in wright_way/orchestration.py
@task(name="pull-ids", description="Get all active Wright Way Petango IDs")
def get_wright_way_ids(client: WrightWayScraper | None = None) -> list[int]:
    """Get all active Wright Way Petango IDs from the Wright Way website.

    Args:
        client (WrightWayScraper | None, optional): Wright Way Scraper client, if not
            provided, a new client will be created. Defaults to None.

    Returns:
        list[int]: List of active Wright Way Petango IDs.
    """
    logger.info("Pulling all active Wright Way Petango IDs")
    client = WrightWayScraper() if client is None else client
    _ids = client.get_animal_ids()
    logger.info(f"Found {len(_ids)} active Petango IDs")
    return _ids

wright_way.get_animal #

get_animal(id_: int, client: WrightWayScraper | None = None) -> Animal

Get an SQLAlchemy Animal instance by Petango ID.

Parameters:

Name Type Description Default
id_ int

Petango ID of the animal.

required
client WrightWayScraper | None

Wright Way Scraper client, if not provided, a new client will be created. Defaults to None.

None

Returns:

Name Type Description
Animal Animal

SQLAlchemy model instance of the animal.

Source code in wright_way/orchestration.py
@task(name="pull-animal", description="Get Animal pydantic instance by Petango ID")
def get_animal(id_: int, client: WrightWayScraper | None = None) -> Animal:
    """Get an SQLAlchemy Animal instance by Petango ID.

    Args:
        id_ (int): Petango ID of the animal.
        client (WrightWayScraper | None, optional): Wright Way Scraper client, if not
            provided, a new client will be created. Defaults to None.

    Returns:
        Animal: SQLAlchemy model instance of the animal.
    """
    logger.info(f"Pulling Animal with Petango ID: {id_}")
    client = WrightWayScraper() if client is None else client
    _animal = client.get_animal(id_)

    if isinstance(_animal.Birthdate, str):
        msg = f"Expected optional datetime, not string: {_animal.Birthdate}"
        raise TypeError(msg)

    if isinstance(_animal.Intake_Date, str):
        msg = f"Expected datetime, not string: {_animal.Intake_Date}"
        raise TypeError(msg)

    return Animal(
        petango_id=_animal.id,
        name=_animal.Name,
        species=_animal.Species,
        breed=_animal.Breed,
        birthdate=_animal.Birthdate,
        gender=_animal.Gender,
        size=_animal.Size,
        color=_animal.Color,
        declawed=_animal.Declawed,
        housetrained=_animal.Housetrained,
        location=_animal.Location,
        intake_date=_animal.Intake_Date,
        stage=_animal.Stage,
        url=_animal.URL,
        profile_url=_animal.Profile_URL,
    )

wright_way.add_animals_to_db #

add_animals_to_db(animals: Collection[Animal], engine: Engine | None = None) -> None

Add animals to the Wright Way local database.

Parameters:

Name Type Description Default
animals Collection[Animal]

List of animals to add to the database.

required
engine Engine | None

SQLAlchemy engine, if not provided, the default engine will be used. Defaults to None.

None
Source code in wright_way/orchestration.py
@task(
    name="add-animals",
    description="Add animals to DB",
    cache_policy=MAGISTRATE_CACHE_POLICY,
)
def add_animals_to_db(
    animals: Collection[Animal],
    engine: Engine | None = None,
) -> None:
    """Add animals to the Wright Way local database.

    Args:
        animals (Collection[Animal]): List of animals to add to the database.
        engine (Engine | None, optional): SQLAlchemy engine, if not provided, the
            default engine will be used. Defaults to None.
    """
    logger.info(f"Adding {len(animals)} animals to DB")
    with Session(resolve_engine(engine), expire_on_commit=False) as session:
        for animal in animals:
            session.add(animal)
        session.commit()

wright_way.delete_animals_from_db #

delete_animals_from_db(petango_ids: Collection[int] | None = None, engine: Engine | None = None) -> None

Delete animals from the Wright Way local database by Petango ID.

Parameters:

Name Type Description Default
petango_ids Collection[int] | None

List of Petango IDs to delete from the database. If None, all animals will be deleted. Defaults to None.

None
engine Engine | None

SQLAlchemy engine, if not provided, the default engine will be used. Defaults to None.

None
Source code in wright_way/orchestration.py
@task(
    name="delete-animals",
    description="Delete animals from DB by Petango ID",
    cache_policy=MAGISTRATE_CACHE_POLICY,
)
def delete_animals_from_db(
    petango_ids: Collection[int] | None = None,
    engine: Engine | None = None,
) -> None:
    """Delete animals from the Wright Way local database by Petango ID.

    Args:
        petango_ids (Collection[int] | None, optional): List of Petango IDs to delete
            from the database. If None, all animals will be deleted. Defaults to None.
        engine (Engine | None, optional): SQLAlchemy engine, if not provided, the
            default engine will be used. Defaults to None.
    """
    with Session(resolve_engine(engine), expire_on_commit=False) as session:
        if petango_ids is None:
            logger.info("Deleting all animals from DB")
            statement = select(Animal)
        else:
            logger.info(f"Deleting {len(petango_ids)} animals from DB")
            statement = select(Animal).where(col(Animal.petango_id).in_(petango_ids))

        for animal in session.exec(statement).all():
            session.delete(animal)
        session.commit()

wright_way.update_animals_in_db #

update_animals_in_db(animals: Collection[Animal], engine: Engine | None = None) -> None

Update animals in the Wright Way local database by Petango ID.

Parameters:

Name Type Description Default
animals Collection[Animal]

List of animals to update in the database.

required
engine Engine | None

SQLAlchemy engine, if not provided, the default engine will be used. Defaults to None.

None
Source code in wright_way/orchestration.py
@task(
    name="update-animals",
    description="Update animals in DB by Petango ID",
    cache_policy=MAGISTRATE_CACHE_POLICY,
)
def update_animals_in_db(
    animals: Collection[Animal],
    engine: Engine | None = None,
) -> None:
    """Update animals in the Wright Way local database by Petango ID.

    Args:
        animals (Collection[Animal]): List of animals to update in the database.
        engine (Engine | None, optional): SQLAlchemy engine, if not provided, the
            default engine will be used. Defaults to None.
    """
    logger.info(f"Updating {len(animals)} animals in DB")
    with Session(resolve_engine(engine), expire_on_commit=False) as session:
        for animal in animals:
            statement = select(Animal).where(Animal.petango_id == animal.petango_id)
            _animal = session.exec(statement).first()
            if _animal is None:
                msg = f"Animal with Petango ID {animal.petango_id} not found in DB"
                raise ValueError(msg)

            animal.id = _animal.id
            animal.created_at = _animal.created_at

            session.delete(_animal)
            session.add(animal)

        session.commit()

wright_way.alert_new_animal async #

alert_new_animal(animal: int | Animal, channel_name_or_id: str, engine: Engine | None = None) -> None

Publish a slack message about a new animal at Wright Way.

Parameters:

Name Type Description Default
animal int | Animal

Petango ID of the animal or the Animal instance.

required
channel_name_or_id str

Slack channel to publish the message to.

required
engine Engine | None

SQLAlchemy engine, if not provided, the default engine will be used. Defaults to None.

None
Source code in wright_way/orchestration.py
@task(
    name="alert-new-animals",
    description="Alert of a new animal at Wright Way",
    cache_policy=MAGISTRATE_CACHE_POLICY,
    retries=10,
    retry_delay_seconds=exponential_backoff(1),
)
async def alert_new_animal(
    animal: int | Animal,
    channel_name_or_id: str,
    engine: Engine | None = None,
) -> None:
    """Publish a slack message about a new animal at Wright Way.

    Args:
        animal (int | Animal): Petango ID of the animal or the Animal instance.
        channel_name_or_id (str): Slack channel to publish the message to.
        engine (Engine | None, optional): SQLAlchemy engine, if not provided, the
            default engine will be used. Defaults to None.
    """
    if isinstance(animal, int):
        with Session(resolve_engine(engine), expire_on_commit=False) as session:
            statement = select(Animal).where(Animal.petango_id == animal)
            _animal = session.exec(statement).first()
            if _animal is None:
                msg = f"Animal with Petango ID {animal} not found in DB"
                raise ValueError(msg)
    else:
        _animal = animal

    logger.info(f"Sending new animal to Slack: {_animal.name} ({_animal.petango_id})")

    if _animal.birthdate is not None:
        delta = relativedelta(
            datetime.now(tz=_animal.birthdate.tzinfo),
            _animal.birthdate,
        )
        _age = f"{delta.months} months, {delta.days} days"
        if delta.years > 0:
            _age = f"{delta.years} years, {_age}"
        _birthdate = f"{_animal.birthdate.date()} ({_age})"
    else:
        _birthdate = "N/A"

    resp = await SLACK_ASSISTANT.post_message(
        channel_name_or_id=channel_name_or_id,
        text=f"New animal at Wright Way: {_animal.name} ({_animal.petango_id})",
        blocks=[
            {"type": "divider"},
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": dedent(f"""
                            *<{_animal.url}|{_animal.name}>*
                            *Species:* {_animal.species}
                            *Breed:* {_animal.breed}
                            *Gender:* {_animal.gender}
                            *Birthdate:* {_birthdate}
                            *Intake Date:* {_animal.intake_date}
                            *Stage:* {_animal.stage}
                            """),
                },
                "accessory": {
                    "type": "image",
                    "image_url": _animal.profile_url,
                    "alt_text": _animal.name,
                },
            },
            {"type": "divider"},
        ],
    )
    if not resp.get("ok", False):
        msg = f"Failed to post message to {channel_name_or_id}"
        raise ConnectionError(msg)