-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pprint_run_response doesn't print if it is ResponseModel in case of stream #1642
Comments
Hey @yogin16, can you help me with your code so that I can reproduce it on my end? |
import json
from typing import Optional, Iterator
from pydantic import BaseModel, Field
from phi.agent import Agent
from phi.model.openai import OpenAIChat
from phi.workflow import Workflow, RunResponse, RunEvent
from phi.storage.workflow.sqlite import SqlWorkflowStorage
from phi.tools.duckduckgo import DuckDuckGo
from phi.utils.pprint import pprint_run_response
from phi.utils.log import logger
class NewsArticle(BaseModel):
title: str = Field(..., description="Title of the article.")
url: str = Field(..., description="Link to the article.")
summary: Optional[str] = Field(..., description="Summary of the article if available.")
class SearchResults(BaseModel):
articles: list[NewsArticle]
class BlogDraft(BaseModel):
title: str = Field(..., description="Title of the drafted blog)
content: str = File(..., description="Blog content in the markdown format)
class BlogPostGenerator(Workflow):
# Define an Agent that will search the web for a topic
searcher: Agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[DuckDuckGo()],
instructions=["Given a topic, search for the top 5 articles."],
response_model=SearchResults,
structured_outputs=True,
)
# Define an Agent that will write the blog post
writer: Agent = Agent(
model=OpenAIChat(id="gpt-4o"),
instructions=[
"You will be provided with a topic and a list of top articles on that topic.",
"Carefully read each article and generate a New York Times worthy blog post on that topic.",
"Break the blog post into sections and provide key takeaways at the end.",
"Make sure the title is catchy and engaging.",
"Always provide sources, do not make up information or sources.",
],
response_model=BlogDraft,
)
def run(self, topic: str, use_cache: bool = True) -> Iterator[RunResponse]:
"""This is where the main logic of the workflow is implemented."""
logger.info(f"Generating a blog post on: {topic}")
# Step 1: Use the cached blog post if use_cache is True
if use_cache:
cached_blog_post = self.get_cached_blog_post(topic)
if cached_blog_post:
yield RunResponse(content=cached_blog_post, event=RunEvent.workflow_completed)
return
# Step 2: Search the web for articles on the topic
search_results: Optional[SearchResults] = self.get_search_results(topic)
# If no search_results are found for the topic, end the workflow
if search_results is None or len(search_results.articles) == 0:
yield RunResponse(
event=RunEvent.workflow_completed,
content=f"Sorry, could not find any articles on the topic: {topic}",
)
return
# Step 3: Write a blog post
yield from self.write_blog_post(topic, search_results)
def get_cached_blog_post(self, topic: str) -> Optional[str]:
"""Get the cached blog post for a topic."""
logger.info("Checking if cached blog post exists")
return self.session_state.get("blog_posts", {}).get(topic)
def add_blog_post_to_cache(self, topic: str, blog_post: Optional[str]):
"""Add a blog post to the cache."""
logger.info(f"Saving blog post for topic: {topic}")
self.session_state.setdefault("blog_posts", {})
self.session_state["blog_posts"][topic] = blog_post
def get_search_results(self, topic: str) -> Optional[SearchResults]:
"""Get the search results for a topic."""
MAX_ATTEMPTS = 3
for attempt in range(MAX_ATTEMPTS):
try:
searcher_response: RunResponse = self.searcher.run(topic)
# Check if we got a valid response
if not searcher_response or not searcher_response.content:
logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS}: Empty searcher response")
continue
# Check if the response is of the expected SearchResults type
if not isinstance(searcher_response.content, SearchResults):
logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS}: Invalid response type")
continue
article_count = len(searcher_response.content.articles)
logger.info(f"Found {article_count} articles on attempt {attempt + 1}")
return searcher_response.content
except Exception as e:
logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS} failed: {str(e)}")
logger.error(f"Failed to get search results after {MAX_ATTEMPTS} attempts")
return None
def write_blog_post(self, topic: str, search_results: SearchResults) -> Iterator[RunResponse]:
"""Write a blog post on a topic."""
logger.info("Writing blog post")
# Prepare the input for the writer
writer_input = {"topic": topic, "articles": [v.model_dump() for v in search_results.articles]}
# Run the writer and yield the response
yield from self.writer.run(json.dumps(writer_input, indent=4), stream=True)
# Save the blog post in the cache
self.add_blog_post_to_cache(topic, self.writer.run_response.content)
# Run the workflow if the script is executed directly
if __name__ == "__main__":
from rich.prompt import Prompt
# Get topic from user
topic = Prompt.ask(
"[bold]Enter a blog post topic[/bold]\n✨",
default="Why Cats Secretly Run the Internet",
)
# Convert the topic to a URL-safe string for use in session_id
url_safe_topic = topic.lower().replace(" ", "-")
# Initialize the blog post generator workflow
# - Creates a unique session ID based on the topic
# - Sets up SQLite storage for caching results
generate_blog_post = BlogPostGenerator(
session_id=f"generate-blog-post-on-{url_safe_topic}",
storage=SqlWorkflowStorage(
table_name="generate_blog_post_workflows",
db_file="tmp/workflows.db",
),
)
# Execute the workflow with caching enabled
# Returns an iterator of RunResponse objects containing the generated content
blog_post: Iterator[RunResponse] = generate_blog_post.run(topic=topic, use_cache=True)
# Print the response
pprint_run_response(blog_post) |
@yogin16 the problem is that your |
I think that is part of the point. tuple is coming from agent only when using ResponseModel let me explain more in the detail. pprint works in following case import json
from typing import Optional, Iterator
from pydantic import BaseModel, Field
from phi.agent import Agent
from phi.model.openai import OpenAIChat
from phi.workflow import Workflow, RunResponse, RunEvent
from phi.storage.workflow.sqlite import SqlWorkflowStorage
from phi.tools.duckduckgo import DuckDuckGo
from phi.utils.pprint import pprint_run_response
from phi.utils.log import logger
class NewsArticle(BaseModel):
title: str = Field(..., description="Title of the article.")
url: str = Field(..., description="Link to the article.")
summary: Optional[str] = Field(..., description="Summary of the article if available.")
class SearchResults(BaseModel):
articles: list[NewsArticle]
class BlogDraft(BaseModel):
title: str = Field(..., description="Title of the drafted blog")
content: str = Field(..., description="Blog content in the markdown format")
class BlogPostGenerator(Workflow):
# Define an Agent that will search the web for a topic
searcher: Agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[DuckDuckGo()],
instructions=["Given a topic, search for the top 5 articles."],
response_model=SearchResults
)
# Define an Agent that will write the blog post
writer: Agent = Agent(
model=OpenAIChat(id="gpt-4o"),
instructions=[
"You will be provided with a topic and a list of top articles on that topic.",
"Carefully read each article and generate a New York Times worthy blog post on that topic.",
"Break the blog post into sections and provide key takeaways at the end.",
"Make sure the title is catchy and engaging.",
"Always provide sources, do not make up information or sources.",
],
)
def run(self, topic: str, use_cache: bool = True) -> Iterator[RunResponse]:
"""This is where the main logic of the workflow is implemented."""
logger.info(f"Generating a blog post on: {topic}")
# Step 1: Use the cached blog post if use_cache is True
if use_cache:
cached_blog_post = self.get_cached_blog_post(topic)
if cached_blog_post:
yield RunResponse(content=cached_blog_post, event=RunEvent.workflow_completed)
return
# Step 2: Search the web for articles on the topic
search_results: Optional[SearchResults] = self.get_search_results(topic)
# If no search_results are found for the topic, end the workflow
if search_results is None or len(search_results.articles) == 0:
yield RunResponse(
event=RunEvent.workflow_completed,
content=f"Sorry, could not find any articles on the topic: {topic}",
)
return
# Step 3: Write a blog post
yield from self.write_blog_post(topic, search_results)
def get_cached_blog_post(self, topic: str) -> Optional[str]:
"""Get the cached blog post for a topic."""
logger.info("Checking if cached blog post exists")
return self.session_state.get("blog_posts", {}).get(topic)
def add_blog_post_to_cache(self, topic: str, blog_post: Optional[str]):
"""Add a blog post to the cache."""
logger.info(f"Saving blog post for topic: {topic}")
self.session_state.setdefault("blog_posts", {})
self.session_state["blog_posts"][topic] = blog_post
def get_search_results(self, topic: str) -> Optional[SearchResults]:
"""Get the search results for a topic."""
MAX_ATTEMPTS = 3
for attempt in range(MAX_ATTEMPTS):
try:
searcher_response: RunResponse = self.searcher.run(topic)
# Check if we got a valid response
if not searcher_response or not searcher_response.content:
logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS}: Empty searcher response")
continue
# Check if the response is of the expected SearchResults type
if not isinstance(searcher_response.content, SearchResults):
logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS}: Invalid response type")
continue
article_count = len(searcher_response.content.articles)
logger.info(f"Found {article_count} articles on attempt {attempt + 1}")
return searcher_response.content
except Exception as e:
logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS} failed: {str(e)}")
logger.error(f"Failed to get search results after {MAX_ATTEMPTS} attempts")
return None
def write_blog_post(self, topic: str, search_results: SearchResults) -> Iterator[RunResponse]:
"""Write a blog post on a topic."""
logger.info("Writing blog post")
# Prepare the input for the writer
writer_input = {"topic": topic, "articles": [v.model_dump() for v in search_results.articles]}
# Run the writer and yield the response
writer_response = self.writer.run(json.dumps(writer_input, indent=4), stream=True)
print("***********")
print(type(writer_response))
print("***********")
yield from writer_response
# Save the blog post in the cache
self.add_blog_post_to_cache(topic, self.writer.run_response.content)
# Run the workflow if the script is executed directly
if __name__ == "__main__":
from rich.prompt import Prompt
# Get topic from user
topic = Prompt.ask(
"[bold]Enter a blog post topic[/bold]\n✨",
default="Why Cats Secretly Run the Internet",
)
# Convert the topic to a URL-safe string for use in session_id
url_safe_topic = topic.lower().replace(" ", "-")
# Initialize the blog post generator workflow
# - Creates a unique session ID based on the topic
# - Sets up SQLite storage for caching results
generate_blog_post = BlogPostGenerator(
session_id=f"generate-blog-post-on-{url_safe_topic}",
storage=SqlWorkflowStorage(
table_name="generate_blog_post_workflows",
db_file="tmp/workflows.db",
),
)
# Execute the workflow with caching enabled
# Returns an iterator of RunResponse objects containing the generated content
blog_post: Iterator[RunResponse] = generate_blog_post.run(topic=topic, use_cache=True)
print(type(blog_post))
# Print the response
pprint_run_response(blog_post)
# for resp in blog_post:
# print(type(resp))
# print(resp)
# print(resp.content) Notice writer agent doesn't use ResponseModel in this case writer: Agent = Agent(
model=OpenAIChat(id="gpt-4o"),
instructions=[
"You will be provided with a topic and a list of top articles on that topic.",
"Carefully read each article and generate a New York Times worthy blog post on that topic.",
"Break the blog post into sections and provide key takeaways at the end.",
"Make sure the title is catchy and engaging.",
"Always provide sources, do not make up information or sources.",
], if the agent was supposed to use the response model, pprint breaks. and here is the example for broken case i.e., adding this param for the writer agent writer: Agent = Agent(
model=OpenAIChat(id="gpt-4o"),
instructions=[
"You will be provided with a topic and a list of top articles on that topic.",
"Carefully read each article and generate a New York Times worthy blog post on that topic.",
"Break the blog post into sections and provide key takeaways at the end.",
"Make sure the title is catchy and engaging.",
"Always provide sources, do not make up information or sources.",
],
response_model=BlogDraft, Here is the complete case with this one line changed: import json
from typing import Optional, Iterator
from pydantic import BaseModel, Field
from phi.agent import Agent
from phi.model.openai import OpenAIChat
from phi.workflow import Workflow, RunResponse, RunEvent
from phi.storage.workflow.sqlite import SqlWorkflowStorage
from phi.tools.duckduckgo import DuckDuckGo
from phi.utils.pprint import pprint_run_response
from phi.utils.log import logger
class NewsArticle(BaseModel):
title: str = Field(..., description="Title of the article.")
url: str = Field(..., description="Link to the article.")
summary: Optional[str] = Field(..., description="Summary of the article if available.")
class SearchResults(BaseModel):
articles: list[NewsArticle]
class BlogDraft(BaseModel):
title: str = Field(..., description="Title of the drafted blog")
content: str = Field(..., description="Blog content in the markdown format")
class BlogPostGenerator(Workflow):
# Define an Agent that will search the web for a topic
searcher: Agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[DuckDuckGo()],
instructions=["Given a topic, search for the top 5 articles."],
response_model=SearchResults
)
# Define an Agent that will write the blog post
writer: Agent = Agent(
model=OpenAIChat(id="gpt-4o"),
instructions=[
"You will be provided with a topic and a list of top articles on that topic.",
"Carefully read each article and generate a New York Times worthy blog post on that topic.",
"Break the blog post into sections and provide key takeaways at the end.",
"Make sure the title is catchy and engaging.",
"Always provide sources, do not make up information or sources.",
],
response_model=BlogDraft, # ADDED THIS LINE TO GET STRUCTURED OUTPUT
)
def run(self, topic: str, use_cache: bool = True) -> Iterator[RunResponse]:
"""This is where the main logic of the workflow is implemented."""
logger.info(f"Generating a blog post on: {topic}")
# Step 1: Use the cached blog post if use_cache is True
if use_cache:
cached_blog_post = self.get_cached_blog_post(topic)
if cached_blog_post:
yield RunResponse(content=cached_blog_post, event=RunEvent.workflow_completed)
return
# Step 2: Search the web for articles on the topic
search_results: Optional[SearchResults] = self.get_search_results(topic)
# If no search_results are found for the topic, end the workflow
if search_results is None or len(search_results.articles) == 0:
yield RunResponse(
event=RunEvent.workflow_completed,
content=f"Sorry, could not find any articles on the topic: {topic}",
)
return
# Step 3: Write a blog post
yield from self.write_blog_post(topic, search_results)
def get_cached_blog_post(self, topic: str) -> Optional[str]:
"""Get the cached blog post for a topic."""
logger.info("Checking if cached blog post exists")
return self.session_state.get("blog_posts", {}).get(topic)
def add_blog_post_to_cache(self, topic: str, blog_post: Optional[str]):
"""Add a blog post to the cache."""
logger.info(f"Saving blog post for topic: {topic}")
self.session_state.setdefault("blog_posts", {})
self.session_state["blog_posts"][topic] = blog_post
def get_search_results(self, topic: str) -> Optional[SearchResults]:
"""Get the search results for a topic."""
MAX_ATTEMPTS = 3
for attempt in range(MAX_ATTEMPTS):
try:
searcher_response: RunResponse = self.searcher.run(topic)
# Check if we got a valid response
if not searcher_response or not searcher_response.content:
logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS}: Empty searcher response")
continue
# Check if the response is of the expected SearchResults type
if not isinstance(searcher_response.content, SearchResults):
logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS}: Invalid response type")
continue
article_count = len(searcher_response.content.articles)
logger.info(f"Found {article_count} articles on attempt {attempt + 1}")
return searcher_response.content
except Exception as e:
logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS} failed: {str(e)}")
logger.error(f"Failed to get search results after {MAX_ATTEMPTS} attempts")
return None
def write_blog_post(self, topic: str, search_results: SearchResults) -> Iterator[RunResponse]:
"""Write a blog post on a topic."""
logger.info("Writing blog post")
# Prepare the input for the writer
writer_input = {"topic": topic, "articles": [v.model_dump() for v in search_results.articles]}
# Run the writer and yield the response
writer_response = self.writer.run(json.dumps(writer_input, indent=4), stream=True)
print("***********")
print(type(writer_response))
print("***********")
yield from writer_response
# Save the blog post in the cache
self.add_blog_post_to_cache(topic, self.writer.run_response.content)
# Run the workflow if the script is executed directly
if __name__ == "__main__":
from rich.prompt import Prompt
# Get topic from user
topic = Prompt.ask(
"[bold]Enter a blog post topic[/bold]\n✨",
default="Why Cats Secretly Run the Internet",
)
# Convert the topic to a URL-safe string for use in session_id
url_safe_topic = topic.lower().replace(" ", "-")
# Initialize the blog post generator workflow
# - Creates a unique session ID based on the topic
# - Sets up SQLite storage for caching results
generate_blog_post = BlogPostGenerator(
session_id=f"generate-blog-post-on-{url_safe_topic}",
storage=SqlWorkflowStorage(
table_name="generate_blog_post_workflows",
db_file="tmp/workflows.db",
),
)
# Execute the workflow with caching enabled
# Returns an iterator of RunResponse objects containing the generated content
blog_post: Iterator[RunResponse] = generate_blog_post.run(topic=topic, use_cache=True)
print(type(blog_post))
# Print the response
# pprint_run_response(blog_post) <- THIS DOESN'T WORK
for resp in blog_post:
print(type(resp))
print(resp)
print(resp.content) One would assume that all agent run responses are composable for a workflow as well as pprint_run_response to handle structured response content (like it does in the non streaming case?) |
@yogin16 I think you are using this cookbook to write your version of it https://github.com/phidatahq/phidata/blob/main/cookbook/workflows/blog_post_generator.py There might be a very small thing that we might be missing here but I suggest taking a look at the cookbook. |
Of course there are workaround available i can work with, as we are talking about just pprint util here. let me make the point more clear still so it is actionable. if you look the the pprint implementation here https://github.com/phidatahq/phidata/blob/main/phi/utils/pprint.py if isinstance(run_response.content, str):
single_response_content = (
Markdown(run_response.content) if markdown else run_response.get_content_as_string(indent=4)
)
elif isinstance(run_response.content, BaseModel):
try:
single_response_content = JSON(run_response.content.model_dump_json(exclude_none=True), indent=2)
except Exception as e:
logger.warning(f"Failed to convert response to Markdown: {e}")
else:
try:
single_response_content = JSON(json.dumps(run_response.content), indent=4)
except Exception as e:
logger.warning(f"Failed to convert response to string: {e}") it handles the case of BaseModel as well -- but only when run_response is a single RunResponse for streaming no handling for BaseModel case. is that expected? streaming_response_content: str = ""
with Live(console=console) as live_log:
status = Status("Working...", spinner="dots")
live_log.update(status)
response_timer = Timer()
response_timer.start()
for resp in run_response:
if isinstance(resp, RunResponse) and isinstance(resp.content, str):
streaming_response_content += resp.content
formatted_response = Markdown(streaming_response_content) if markdown else streaming_response_content # type: ignore
table = Table(box=ROUNDED, border_style="blue", show_header=False)
if show_time:
table.add_row(f"Response\n({response_timer.elapsed:.1f}s)", formatted_response) # type: ignore
else:
table.add_row(formatted_response) # type: ignore
live_log.update(table)
response_timer.stop() do we have a cookbook example where the last agent is using a response_model? thanks |
another bigger discussion point maybe is if we have following code anywhere agent_response = self.agent.run(json.dumps(writer_input, indent=4), stream=True) where agent is the phidata Agent do we always expect agent_response to be of type RunResponse? because it seems that it can be a tuple as well! |
pprint_run_response print blank response if it is Iterable and from structured response. Possibility in case of a Workflow.
/phi/utils/pprint.py
when resp.conent is not str and something of BaseModel, it prints blank
The text was updated successfully, but these errors were encountered: