Hello everyone,
I’ve been working on a custom workflow agent (NewsReportGenerator
) using the phidata, It prompts for a topic, then performs web searches, scrapes articles, and finally generates a legal-style analysis report. Everything is good in local testing.
However, I’m a bit confused about the next steps for deployment
how to integrate this workflow into the FastAPI app provided by the default phidata agent-app
template, and how to make it accessible via an HTTP endpoint. I’d like to run everything inside Docker with phi ws up --group api
, so others can call my agent or incorporate it into their own workflows/UI.
Here’s a summary of my current setup:
- File:
analisis.py
defines:
- A
Workflow
subclass namedNewsReportGenerator
. - Three agent objects (
web_searcher
,article_scraper
,writer
). - A main block that runs
generate_news_report.run(...)
if called directly.
- Goal:
- Make
NewsReportGenerator
available via an endpoint (e.g.,POST /v1/analisis/run_workflow
). - Deploy everything within the phidata Docker environment, so it’s production-ready.
- What I’ve Tried:
- Following the official phidata docs to create a router and import the workflow.
- Making a new
analisis_router.py
and registering it inv1_router.py
. - Checking container logs to ensure everything starts up correctly.
- Questions:
- Is there a recommended or documented way (beyond the basic examples) to expose a phidata
Workflow
via FastAPI? - After I run
phi ws up --group api
, how do I handle streaming outputs or longer-running workflows in a more user-friendly manner?
If anyone has a step-by-step guide or can point me to an up-to-date reference, I would really appreciate it. I’ve pieced together partial solutions from various docs, but it still feels somewhat scattered.
Thanks in advance for any help or examples you can provide!
i have put this agent into agent folder, or where i should put this file
"""
1. Pastikan untuk meng-install dependencies menggunakan:
`pip install openai duckduckgo-search newspaper4k lxml_html_clean sqlalchemy phidata`
2. Jalankan skrip ini dengan perintah:
`python cookbook/workflows/news_article_generator.py`
"""
import json
from textwrap import dedent
from typing import Optional, Dict, Iterator
from pydantic import BaseModel, Field
from phi.agent import Agent
from phi.model.openai import OpenAIChat
from phi.workflow import Workflow, RunResponse, RunEvent
from phi.storage.workflow.sqlite import SqlWorkflowStorage
from phi.tools.duckduckgo import DuckDuckGo
from phi.tools.newspaper4k import Newspaper4k
from phi.utils.pprint import pprint_run_response
from phi.utils.log import logger
class NewsArticle(BaseModel):
"""
Representasi sebuah artikel berita dengan judul, url, dan ringkasan (jika ada).
"""
title: str = Field(..., description="Judul artikel.")
url: str = Field(..., description="Tautan ke artikel.")
summary: Optional[str] = Field(..., description="Ringkasan artikel jika tersedia.")
class SearchResults(BaseModel):
"""
Kumpulan hasil pencarian yang berisi daftar artikel.
"""
articles: list[NewsArticle]
class ScrapedArticle(BaseModel):
"""
Representasi artikel yang telah di-scrape dengan:
- judul
- tautan
- ringkasan
- konten dalam format markdown (jika memungkinkan).
"""
title: str = Field(..., description="Judul artikel.")
url: str = Field(..., description="Tautan ke artikel.")
summary: Optional[str] = Field(..., description="Ringkasan artikel jika tersedia.")
content: Optional[str] = Field(
...,
description="Konten artikel dalam format markdown jika tersedia. Gunakan None jika konten tidak ada atau tidak relevan.",
)
class NewsReportGenerator(Workflow):
# Deskripsi ini hanya digunakan di antarmuka Workflow
description: str = "Menghasilkan laporan berita komprehensif terhadap topik tertentu."
# AGENT UNTUK PENCARIAN WEB
web_searcher: Agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[DuckDuckGo()],
instructions=[
"Diberikan sebuah topik, carilah 10 artikel dan kembalikan 5 artikel yang paling relevan.",
],
response_model=SearchResults,
)
# AGENT UNTUK SCRAPING ARTIKEL
article_scraper: Agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[Newspaper4k()],
instructions=[
"Diberikan sebuah URL, lakukan scraping artikel tersebut. Kembalikan judul, url, dan konten berformat markdown.",
"Jika konten tidak tersedia atau tidak masuk akal, kembalikan None untuk konten.",
],
response_model=ScrapedArticle,
)
# AGENT UNTUK MENULIS LAPORAN (PENELITI HUKUM)
writer: Agent = Agent(
model=OpenAIChat(id="gpt-4o"),
description=(
"Anda adalah seorang Peneliti Hukum di Indonesia. "
"Tugas Anda adalah melakukan tinjauan yuridis secara mendalam terhadap sebuah kasus "
"berdasarkan sumber-sumber hukum yang relevan. "
"Anda harus selalu menyertakan rujukan peraturan perundang-undangan, link sumber, "
"dan dasar hukum resmi atau dokumen pendukung lainnya. "
"Output yang dihasilkan harus memenuhi standar akademik atau keilmuan hukum."
),
instructions=[
"Anda akan diberikan artikel, referensi, atau informasi seputar kasus tertentu.",
"Bacalah secara seksama untuk mendapatkan konteks hukum yang relevan di Indonesia.",
"Selalu cantumkan pasal, undang-undang, peraturan pemerintah, atau yurisprudensi yang relevan.",
"Tulis output dalam format analisis hukum yang baku dan sertakan sumber dengan link (jika ada) di bagian akhir.",
"Usahakan untuk menggunakan istilah hukum yang tepat dan hindari spekulasi tanpa dasar hukum yang jelas."
],
expected_output=dedent("""\
<analysis_format>
## Pendahuluan
{Jelaskan secara singkat konteks kasus dan alasan pentingnya ditinjau dari sisi hukum}
## Dasar Hukum
{Rincikan peraturan perundang-undangan, pasal, yurisprudensi, atau doktrin hukum yang relevan.
Berikan penjelasan singkat mengenai mengapa pasal/UU tersebut berlaku.}
## Analisis Kasus
{Analisis fakta-fakta yang relevan dengan ketentuan hukum di atas.
Jika perlu, sertakan pertimbangan yurisprudensi atau pendapat ahli lain (doktrin).}
## Kesimpulan
{Simpulkan temuan atau analisis Anda.
Berikan keilmuan mu sesuai dengan dasar hukum.}
## Sumber
- [Nama UU/Peraturan/Putusan/Website/Lembaga Terkait](URL)
- [Nama Sumber Tambahan](URL)
... (tambahkan sesuai kebutuhan)
</analysis_format>
"""),
)
def get_report_from_cache(self, topic: str) -> Optional[str]:
"""
Memeriksa apakah laporan untuk suatu topik sudah ada di cache (session_state).
"""
logger.info("Memeriksa apakah laporan sudah tersedia di cache")
return self.session_state.get("reports", {}).get(topic)
def add_report_to_cache(self, topic: str, report: Optional[str]):
"""
Menyimpan laporan ke dalam cache (session_state).
"""
logger.info(f"Menyimpan laporan untuk topik: {topic}")
self.session_state.setdefault("reports", {})
self.session_state["reports"][topic] = report
def get_search_results(self, topic: str, use_search_cache: bool) -> Optional[SearchResults]:
"""
Mendapatkan hasil pencarian artikel terkait topik.
Menggunakan cache jika diizinkan dan tersedia,
jika tidak, akan memanggil web_searcher.
"""
search_results: Optional[SearchResults] = None
# Mengambil hasil pencarian dari cache jika tersedia dan diperbolehkan
if (
use_search_cache
and "search_results" in self.session_state
and topic in self.session_state["search_results"]
):
try:
search_results = SearchResults.model_validate(self.session_state["search_results"][topic])
logger.info(f"Ditemukan {len(search_results.articles)} artikel di cache.")
except Exception as e:
logger.warning(f"Tidak dapat membaca hasil pencarian dari cache: {e}")
# Jika tidak ada di cache, lakukan pencarian baru
if search_results is None:
web_searcher_response: RunResponse = self.web_searcher.run(topic)
if (
web_searcher_response
and web_searcher_response.content
and isinstance(web_searcher_response.content, SearchResults)
):
logger.info(f"WebSearcher menemukan {len(web_searcher_response.content.articles)} artikel.")
search_results = web_searcher_response.content
# Cache hasil pencarian jika ada
if search_results is not None:
if "search_results" not in self.session_state:
self.session_state["search_results"] = {}
self.session_state["search_results"][topic] = search_results.model_dump()
return search_results
def scrape_articles(self, search_results: SearchResults, use_scrape_cache: bool) -> Dict[str, ScrapedArticle]:
"""
Melakukan scraping pada daftar artikel hasil pencarian.
Jika use_scrape_cache True, akan menggunakan data scraping yang tersimpan jika ada.
"""
scraped_articles: Dict[str, ScrapedArticle] = {}
# Mengambil data scraping dari cache jika diizinkan dan tersedia
if (
use_scrape_cache
and "scraped_articles" in self.session_state
and isinstance(self.session_state["scraped_articles"], dict)
):
for url, scraped_article in self.session_state["scraped_articles"].items():
try:
validated_scraped_article = ScrapedArticle.model_validate(scraped_article)
scraped_articles[validated_scraped_article.url] = validated_scraped_article
except Exception as e:
logger.warning(f"Tidak dapat membaca artikel scraping dari cache: {e}")
logger.info(f"Ditemukan {len(scraped_articles)} artikel ter-scrape di cache.")
# Melakukan scraping terhadap artikel yang belum ada di cache
for article in search_results.articles:
if article.url in scraped_articles:
logger.info(f"Artikel sudah ada di cache: {article.url}")
continue
article_scraper_response: RunResponse = self.article_scraper.run(article.url)
if (
article_scraper_response
and article_scraper_response.content
and isinstance(article_scraper_response.content, ScrapedArticle)
):
scraped_articles[article_scraper_response.content.url] = article_scraper_response.content
logger.info(f"Berhasil scrape artikel: {article_scraper_response.content.url}")
# Menyimpan hasil scraping ke session_state
if "scraped_articles" not in self.session_state:
self.session_state["scraped_articles"] = {}
for url, scraped_article in scraped_articles.items():
self.session_state["scraped_articles"][url] = scraped_article.model_dump()
return scraped_articles
def write_news_report(self, topic: str, scraped_articles: Dict[str, ScrapedArticle]) -> Iterator[RunResponse]:
"""
Menggunakan agent `writer` untuk menyusun laporan akhir berdasarkan artikel yang telah di-scrape.
"""
logger.info("Menulis laporan berita")
# Mempersiapkan data yang akan diberikan ke agent writer
writer_input = {"topic": topic, "articles": [v.model_dump() for v in scraped_articles.values()]}
# Jalankan agent writer secara streaming
yield from self.writer.run(json.dumps(writer_input, indent=4), stream=True)
# Simpan laporan ke cache setelah selesai
self.add_report_to_cache(topic, self.writer.run_response.content)
def run(
self, topic: str, use_search_cache: bool = True, use_scrape_cache: bool = True, use_cached_report: bool = True
) -> Iterator[RunResponse]:
"""
Menghasilkan laporan berita komprehensif tentang suatu topik.
Workflow ini melakukan langkah-langkah berikut:
1. Memeriksa apakah sudah ada laporan yang di-cache jika use_cached_report True.
2. Mencari artikel terkait topik:
- Menggunakan cache hasil pencarian (search_results) jika diizinkan dan tersedia.
- Jika tidak, memanggil agen web_searcher untuk pencarian baru.
3. Melakukan scraping konten setiap artikel:
- Menggunakan data scraped_articles dari cache jika diizinkan dan tersedia.
- Jika tidak, melakukan scraping baru.
4. Menggunakan konten artikel yang sudah di-scrape untuk menulis laporan akhir melalui agent `writer`.
Args:
topic (str): Topik yang akan dibahas dalam laporan.
use_search_cache (bool, optional): Gunakan hasil pencarian yang sudah di-cache. Default True.
use_scrape_cache (bool, optional): Gunakan hasil scraping yang sudah di-cache. Default True.
use_cached_report (bool, optional): Gunakan laporan yang sudah di-cache untuk topik ini. Default True.
Returns:
Iterator[RunResponse]: Objek generator yang mengembalikan laporan atau status proses.
"""
logger.info(f"Memulai pembuatan laporan untuk topik: {topic}")
# Jika user mengizinkan penggunaan laporan yang sudah ada di cache
if use_cached_report:
cached_report = self.get_report_from_cache(topic)
if cached_report:
yield RunResponse(content=cached_report, event=RunEvent.workflow_completed)
return
# Mencari artikel berdasarkan topik
search_results: Optional[SearchResults] = self.get_search_results(topic, use_search_cache)
# Jika tidak ditemukan artikel sama sekali
if search_results is None or len(search_results.articles) == 0:
yield RunResponse(
event=RunEvent.workflow_completed,
content=f"Maaf, tidak menemukan artikel apa pun untuk topik: {topic}",
)
return
# Melakukan scraping pada artikel-artikel tersebut
scraped_articles: Dict[str, ScrapedArticle] = self.scrape_articles(search_results, use_scrape_cache)
# Menulis laporan komprehensif
yield from self.write_news_report(topic, scraped_articles)
# Jika file ini dieksekusi langsung (bukan diimport sebagai modul)
if __name__ == "__main__":
from rich.prompt import Prompt
# Meminta input topik dari pengguna
topic = Prompt.ask(
"[bold]Masukkan topik laporan berita[/bold]\n✨",
default="IBM Hashicorp Acquisition",
)
# Membuat string yang aman untuk URL dari topik
url_safe_topic = topic.lower().replace(" ", "-")
# Inisialisasi workflow generator laporan berita
generate_news_report = NewsReportGenerator(
session_id=f"generate-report-on-{url_safe_topic}",
storage=SqlWorkflowStorage(
table_name="generate_news_report_workflows",
db_file="tmp/workflows.db",
),
)
# Jalankan workflow dengan opsi cache diaktifkan
report_stream: Iterator[RunResponse] = generate_news_report.run(
topic=topic, use_search_cache=True, use_scrape_cache=True, use_cached_report=True
)
# Cetak hasil laporan
pprint_run_response(report_stream, markdown=True)