Redirect loguru logging messages to the prefect run logger.
This function should be called from within a Prefect task or flow before calling
any module that uses loguru. This function can be safely called multiple times. This
was pulled from anna-geller/loguru.py
!!! example "Example usage":
from prefect import flow
from loguru import logger
# import this function in your flow from your module
from magistrate import enable_loguru_support
@flow()
def myflow():
logger.info("This is hidden from the Prefect UI")
enable_loguru_support()
logger.info("This shows up in the Prefect UI")
Source code in magistrate/loguru.py
| def enable_loguru_support() -> None:
"""Redirect loguru logging messages to the prefect run logger.
This function should be called from within a Prefect task or flow before calling
any module that uses loguru. This function can be safely called multiple times. This
was pulled from [anna-geller/loguru.py](https://gist.github.com/anna-geller/0b9e6ecbde45c355af425cd5b97e303d/f1bfb7038f5d5fecc1e13245b50f8251c3c00f91)
!!! example "Example usage":
```python
from prefect import flow
from loguru import logger
# import this function in your flow from your module
from magistrate import enable_loguru_support
@flow()
def myflow():
logger.info("This is hidden from the Prefect UI")
enable_loguru_support()
logger.info("This shows up in the Prefect UI")
```
"""
# import here for distributed execution because loguru cannot be pickled.
from loguru import logger # noqa: PLC0415
try:
run_logger = get_run_logger()
except MissingContextError:
msg = (
"Loguru support is enabled, but no Prefect run logger is available. "
"You may be trying to use loguru outside of a Prefect flow or task."
)
logger.warning(msg)
return
logger.remove()
# Map loguru levels to prefect logger methods
level_mappings = {
"DEBUG": run_logger.debug,
"INFO": run_logger.info,
"SUCCESS": run_logger.info,
"WARNING": run_logger.warning,
"ERROR": run_logger.error,
"CRITICAL": run_logger.critical,
}
for level_name, sink_method in level_mappings.items():
logger.add(
sink=sink_method,
filter=lambda record, level=level_name: record["level"].name == level,
level="TRACE",
)
|