Skip to content

Slack Utilities#

magistrate.SlackAssistant #

Bases: Block

A Prefect block for interacting with Slack.

This block provides methods for posting messages to Slack channels and other Slack API interactions. It attempts to make working with the Slack API easier than using the slack_sdk or the default Prefect Slack integration.

Attributes:

Name Type Description
token SecretStr | str | None

Bot user OAuth token for the Slack app used to perform actions. Can be set as an environment variable with the name PREFECT_SLACK_TOKEN. Requires the scopes channels:join, channels:read, chat:write, files:read, files:write, groups:read, im:read, mpim:read, and remote_files:write.

file_upload async #

file_upload(*, channel_name_or_id: str | None = None, filename: str | None = None, file: str | Path | bytes | None = None, content: str | bytes | None = None, title: str | None = None, alt_text: str | None = None, initial_comment: str | None = None) -> dict

Upload a file to a Slack channel.

For the most part, this method is a thin wrapper around files_upload_v2. That is a fancy wrapper around files.getUploadURLExternal and files.completeUploadExternal. There is no documentation for files_upload_v2 in the slack_sdk library, but the 3.19.0 release notes are the best place to look for more information.

files_upload_v2 has a slight delay in uploading files (see slackapi/python-slack-sdk#1521), so this method polls the Slack API to confirm that the files have actually been uploaded. This is done by calling files_info on the uploaded file ID until the file is found or the maximum number of retries is reached. The dict returned by files_info is returned by this method.

Parameters:

Name Type Description Default
channel_name_or_id str | None

The name or ID of the channel to post the message to. If not set, the file will be uploaded but not published. The file will need to be shared via a link or a call to self.post_message. Defaults to None.

None
filename str | None

Name of the file being uploaded. Should only be used if content is provided. Defaults to None.

None
file str | Path | bytes | None

Path to the file upload, or bytes of the file. Should only be used if content is not provided.

None
content str | bytes | None

Content of the file to upload. Should only be used if file is not provided. Defaults to None.

None
title str | None

Title of the file uploaded. Defaults to None.

None
alt_text str | None

Description of image for screen-reader. Defaults to None.

None
initial_comment str | None

The message text introducing the file in specified channels. If None, no text will be posted. Defaults to None.

None

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

If the file upload fails.

Returns:

Name Type Description
dict dict

The response from files_info for the uploaded file.

Source code in magistrate/slack.py
async def file_upload(  # noqa: PLR0913
    self,
    *,
    channel_name_or_id: str | None = None,
    filename: str | None = None,
    file: str | Path | bytes | None = None,
    content: str | bytes | None = None,
    title: str | None = None,
    alt_text: str | None = None,
    initial_comment: str | None = None,
) -> dict:
    """Upload a file to a Slack channel.

    For the most part, this method is a thin wrapper around `files_upload_v2`. That
    is a fancy wrapper around [files.getUploadURLExternal](https://api.slack.com/methods/files.getUploadURLExternal)
    and [files.completeUploadExternal](https://api.slack.com/methods/files.completeUploadExternal).
    There is no documentation for `files_upload_v2` in the `slack_sdk` library, but
    the [3.19.0 release notes](https://github.com/slackapi/python-slack-sdk/releases/tag/v3.19.0)
    are the best place to look for more information.

    `files_upload_v2` has a slight delay in uploading files (see [slackapi/python-slack-sdk#1521](https://github.com/slackapi/python-slack-sdk/issues/1521#issuecomment-2197783409)),
    so this method polls the Slack API to confirm that the files have actually been
    uploaded. This is done by calling `files_info` on the uploaded file ID until the
    file is found or the maximum number of retries is reached. The dict returned by
    `files_info` is returned by this method.

    Args:
        channel_name_or_id (str | None, optional): The name or ID of the channel to
            post the message to. If not set, the file will be uploaded but not
            published. The file will need to be shared via a link or a call to
            `self.post_message`. Defaults to None.
        filename (str | None, optional): Name of the file being uploaded. Should
            only be used if `content` is provided. Defaults to None.
        file (str | Path | bytes | None, optional): Path to the file upload, or
            bytes of the file. Should only be used if `content` is not provided.
        content (str | bytes | None, optional): Content of the file to upload.
            Should only be used if `file` is not provided. Defaults to None.
        title (str | None, optional): Title of the file uploaded. Defaults to None.
        alt_text (str | None, optional): Description of image for screen-reader.
            Defaults to None.
        initial_comment (str | None, optional): The message text introducing the
            file in specified channels. If None, no text will be posted. Defaults to
            None.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If the file upload fails.

    Returns:
        dict: The response from `files_info` for the uploaded file.
    """
    if channel_name_or_id is not None:
        channel_id = await self.resolve_channel_to_id(channel_name_or_id)
        await self.join_channel_if_necessary(channel_id)
        channels = [channel_id]
    else:
        channels = None

    if isinstance(file, Path | str):
        _file = Path(file)
        if not _file.exists():
            msg = f"File `{file}` does not exist"
            raise FileNotFoundError(msg)
        file = _file.as_posix()

    client = self.get_client()
    _resp = await client.files_upload_v2(
        filename=filename,
        file=file,
        content=content,
        title=title,
        alt_text=alt_text,
        initial_comment=initial_comment,
        channels=channels,
    )
    resp = _resp_dict_check(_resp.data)

    if resp.get("ok", False) and len(files := resp.get("files", [])) == 1:
        return await self._check_file_upload(files[0].get("id"))

    msg = f"Failed to upload file to channel `{channel_name_or_id}`"
    raise ValueError(msg)

flow_status_hook #

flow_status_hook(channel_name_or_id: str = STATUS_SLACK_CHANNEL) -> FlowStateHook

Create a Prefect hook that posts flow status updates to a Slack channel.

The hook will post a message to the specified channel whenever the flow run status changes. The message will contain a summary of the flow run status and a link to the flow run in the Prefect UI.

The message will look something like this:

## Prefect Flow Run Status
*Flow:*                *Run:*
[Flow Name](https://link-to-flow-in-prefect-ui)                [Run Name](https://link-to-flow-run-in-prefect-ui)
*State:*
`State Name`

To use this hook, add it to the flow decorator according to the Prefect documentation:

@flow(
    name="my-flow",
    on_completion=[SlackAssistant().flow_status_hook("#my-channel")],
    on_failure=[SlackAssistant().flow_status_hook("#my-channel")],
    on_crashed=[SlackAssistant().flow_status_hook("#my-channel")],
)
def my_flow():
    pass

Parameters:

Name Type Description Default
channel_name_or_id str

The name or ID of the channel to post the message to. Defaults to magistrate.STATUS_SLACK_CHANNEL.

STATUS_SLACK_CHANNEL

Returns:

Name Type Description
FlowStateHook FlowStateHook

A callable that can be used as a Prefect hook.

Source code in magistrate/slack.py
def flow_status_hook(
    self,
    channel_name_or_id: str = STATUS_SLACK_CHANNEL,
) -> FlowStateHook:
    """Create a Prefect hook that posts flow status updates to a Slack channel.

    The hook will post a message to the specified channel whenever the flow run
    status changes. The message will contain a summary of the flow run status and a
    link to the flow run in the Prefect UI.

    The message will look something like this:

    ```
    ## Prefect Flow Run Status
    *Flow:*                *Run:*
    [Flow Name](https://link-to-flow-in-prefect-ui)                [Run Name](https://link-to-flow-run-in-prefect-ui)
    *State:*
    `State Name`
    ```

    To use this hook, add it to the flow decorator according to the [Prefect documentation](https://docs.prefect.io/3.0/develop/manage-states#execute-code-on-state-changes):

    ```python
    @flow(
        name="my-flow",
        on_completion=[SlackAssistant().flow_status_hook("#my-channel")],
        on_failure=[SlackAssistant().flow_status_hook("#my-channel")],
        on_crashed=[SlackAssistant().flow_status_hook("#my-channel")],
    )
    def my_flow():
        pass
    ```

    Args:
        channel_name_or_id (str, optional): The name or ID of the channel to post
            the message to. Defaults to `magistrate.STATUS_SLACK_CHANNEL`.

    Returns:
        FlowStateHook: A callable that can be used as a Prefect hook.
    """
    return partial(  # pyright: ignore[reportReturnType]
        self._flow_status_hook,
        assistant=self,
        channel_name_or_id=channel_name_or_id,
    )

get_channel_id async #

get_channel_id(channel_name: str) -> str

Get the ID of a Slack channel by name.

This method is a thin wrapper around the conversations_list method of the Slack API. It retrieves a list of all channels the bot user is a member of, and returns the ID of the channel with the specified name.

Parameters:

Name Type Description Default
channel_name str

The name of the channel to get the ID of.

required

Raises:

Type Description
ValueError

If the channel with the specified name cannot be found.

Returns:

Name Type Description
str str

The ID of the channel with the specified name.

Source code in magistrate/slack.py
async def get_channel_id(self, channel_name: str) -> str:
    """Get the ID of a Slack channel by name.

    This method is a thin wrapper around the `conversations_list` method of the
    Slack API. It retrieves a list of all channels the bot user is a member of, and
    returns the ID of the channel with the specified name.

    Args:
        channel_name (str): The name of the channel to get the ID of.

    Raises:
        ValueError: If the channel with the specified name cannot be found.

    Returns:
        str: The ID of the channel with the specified name.
    """
    return await _get_channel_id(channel_name, token=self.get_token())

get_client #

get_client() -> AsyncWebClient

Create a new Slack web client instance.

Returns:

Name Type Description
AsyncWebClient AsyncWebClient

A new Slack web client instance.

Source code in magistrate/slack.py
def get_client(self) -> AsyncWebClient:
    """Create a new Slack web client instance.

    Returns:
        AsyncWebClient: A new Slack web client instance.
    """
    return AsyncWebClient(token=self.get_token())

get_token #

get_token() -> str

Get the bot user OAuth token for the Slack app.

Raises:

Type Description
TypeError

If the token is not a str or SecretStr.

Returns:

Name Type Description
str str

The bot user OAuth token for the Slack app.

Source code in magistrate/slack.py
def get_token(self) -> str:
    """Get the bot user OAuth token for the Slack app.

    Raises:
        TypeError: If the token is not a `str` or `SecretStr`.

    Returns:
        str: The bot user OAuth token for the Slack app.
    """
    if isinstance(self.token, str):
        return self.token
    if isinstance(self.token, SecretStr):
        return self.token.get_secret_value()
    msg = f"Invalid token type: {type(self.token).__name__}"
    raise TypeError(msg)

join_channel_if_necessary async #

join_channel_if_necessary(channel_name_or_id: str) -> None

Join a Slack channel if the bot user is not already a member.

Parameters:

Name Type Description Default
channel_name_or_id str

The name or ID of the channel to join.

required

Raises:

Type Description
ValueError

If the bot user could not join the channel.

Source code in magistrate/slack.py
async def join_channel_if_necessary(self, channel_name_or_id: str) -> None:
    """Join a Slack channel if the bot user is not already a member.

    Args:
        channel_name_or_id (str): The name or ID of the channel to join.

    Raises:
        ValueError: If the bot user could not join the channel.
    """
    await _join_channel_if_necessary(
        channel_id=await self.resolve_channel_to_id(channel_name_or_id),
        token=self.get_token(),
    )

post_message async #

post_message(channel_name_or_id: str, *, text: str | None = None, attachments: str | list[dict | Attachment] | None = None, blocks: str | list[dict | Block] | None = None) -> dict

Post a message to a Slack channel.

Parameters:

Name Type Description Default
channel_name_or_id str

The name or ID of the channel to post the message to.

required
text str | None

Standard text to post to the channel. Defaults to None.

None
attachments str | list[dict | Attachment] | None

Attachment(s) to post alongside the text or blocks. Defaults to None.

None
blocks str | list[dict | Block] | None

Slack Block(s) to post to the channel. Blocks are a more advanced way to format messages in Slack, and can be used to create more complex layouts and interactions than plain text messages or attachments. Defaults to None.

None

Returns:

Name Type Description
dict dict

The response from the Slack API.

Source code in magistrate/slack.py
async def post_message(
    self,
    channel_name_or_id: str,
    *,
    text: str | None = None,
    attachments: str | list[dict | Attachment] | None = None,
    blocks: str | list[dict | SlackBlock] | None = None,
) -> dict:
    """Post a message to a Slack channel.

    Args:
        channel_name_or_id (str): The name or ID of the channel to post the message
            to.
        text (str | None, optional): Standard text to post to the channel. Defaults
            to None.
        attachments (str | list[dict  |  Attachment] | None, optional):
            Attachment(s) to post alongside the text or blocks. Defaults to None.
        blocks (str | list[dict  |  SlackBlock] | None, optional): Slack Block(s) to
            post to the channel. Blocks are a more advanced way to format messages
            in Slack, and can be used to create more complex layouts and
            interactions than plain text messages or attachments. Defaults to None.

    Returns:
        dict: The response from the Slack API.
    """
    if isinstance(blocks, list) and len(blocks) > 50:  # noqa: PLR2004
        msg = "Slack API only allows 50 blocks per message"
        raise ValueError(msg)

    channel_id = await self.resolve_channel_to_id(channel_name_or_id)

    await self.join_channel_if_necessary(channel_id)

    client = self.get_client()
    _resp = await client.chat_postMessage(
        channel=channel_id,
        text=text,
        attachments=attachments,
        blocks=blocks,
    )

    return _resp_dict_check(_resp.data)

resolve_channel_to_id async #

resolve_channel_to_id(channel_name_or_id: str) -> str

Converts either a channel name or ID to a channel ID.

Slack expects channel IDs for most API calls, so this method is used to make it easier to work with channel names, which are more human-readable.

Channel names should start with a # character, while channel IDs should start with C07.

Parameters:

Name Type Description Default
channel_name_or_id str

The channel name or ID to resolve.

required

Raises:

Type Description
ValueError

If the input is neither a channel name nor ID.

Returns:

Name Type Description
str str

The resolved channel ID.

Source code in magistrate/slack.py
async def resolve_channel_to_id(self, channel_name_or_id: str) -> str:
    """Converts either a channel name or ID to a channel ID.

    Slack expects channel IDs for most API calls, so this method is used to make it
    easier to work with channel names, which are more human-readable.

    Channel names should start with a `#` character, while channel IDs should start
    with `C07`.

    Args:
        channel_name_or_id (str): The channel name or ID to resolve.

    Raises:
        ValueError: If the input is neither a channel name nor ID.

    Returns:
        str: The resolved channel ID.
    """
    if self._is_channel_id(channel_name_or_id):
        return channel_name_or_id
    if self._is_channel_name(channel_name_or_id):
        return await self.get_channel_id(channel_name_or_id)
    msg = f"Invalid channel name or ID: `{channel_name_or_id}`"
    raise ValueError(msg)

magistrate.STATUS_SLACK_CHANNEL module-attribute #

STATUS_SLACK_CHANNEL = '#prefect-status-checks'