How to Deploy a Custom Workflow Agent

Hello everyone,

I’ve been working on a custom workflow agent (NewsReportGenerator) using the phidata, It prompts for a topic, then performs web searches, scrapes articles, and finally generates a legal-style analysis report. Everything is good in local testing.

However, I’m a bit confused about the next steps for deployment

how to integrate this workflow into the FastAPI app provided by the default phidata agent-app template, and how to make it accessible via an HTTP endpoint. I’d like to run everything inside Docker with phi ws up --group api, so others can call my agent or incorporate it into their own workflows/UI.

Here’s a summary of my current setup:

  1. File: analisis.py defines:
  • A Workflow subclass named NewsReportGenerator.
  • Three agent objects (web_searcher, article_scraper, writer).
  • A main block that runs generate_news_report.run(...) if called directly.
  1. Goal:
  • Make NewsReportGenerator available via an endpoint (e.g., POST /v1/analisis/run_workflow).
  • Deploy everything within the phidata Docker environment, so it’s production-ready.
  1. What I’ve Tried:
  • Following the official phidata docs to create a router and import the workflow.
  • Making a new analisis_router.py and registering it in v1_router.py.
  • Checking container logs to ensure everything starts up correctly.
  1. Questions:
  2. Is there a recommended or documented way (beyond the basic examples) to expose a phidata Workflow via FastAPI?
  3. After I run phi ws up --group api, how do I handle streaming outputs or longer-running workflows in a more user-friendly manner?

If anyone has a step-by-step guide or can point me to an up-to-date reference, I would really appreciate it. I’ve pieced together partial solutions from various docs, but it still feels somewhat scattered.

Thanks in advance for any help or examples you can provide!

i have put this agent into agent folder, or where i should put this file

"""
1. Pastikan untuk meng-install dependencies menggunakan: 
   `pip install openai duckduckgo-search newspaper4k lxml_html_clean sqlalchemy phidata`

2. Jalankan skrip ini dengan perintah: 
   `python cookbook/workflows/news_article_generator.py`
"""

import json
from textwrap import dedent
from typing import Optional, Dict, Iterator

from pydantic import BaseModel, Field

from phi.agent import Agent
from phi.model.openai import OpenAIChat
from phi.workflow import Workflow, RunResponse, RunEvent
from phi.storage.workflow.sqlite import SqlWorkflowStorage
from phi.tools.duckduckgo import DuckDuckGo
from phi.tools.newspaper4k import Newspaper4k
from phi.utils.pprint import pprint_run_response
from phi.utils.log import logger


class NewsArticle(BaseModel):
    """
    Representasi sebuah artikel berita dengan judul, url, dan ringkasan (jika ada).
    """
    title: str = Field(..., description="Judul artikel.")
    url: str = Field(..., description="Tautan ke artikel.")
    summary: Optional[str] = Field(..., description="Ringkasan artikel jika tersedia.")


class SearchResults(BaseModel):
    """
    Kumpulan hasil pencarian yang berisi daftar artikel.
    """
    articles: list[NewsArticle]


class ScrapedArticle(BaseModel):
    """
    Representasi artikel yang telah di-scrape dengan:
    - judul
    - tautan
    - ringkasan
    - konten dalam format markdown (jika memungkinkan).
    """
    title: str = Field(..., description="Judul artikel.")
    url: str = Field(..., description="Tautan ke artikel.")
    summary: Optional[str] = Field(..., description="Ringkasan artikel jika tersedia.")
    content: Optional[str] = Field(
        ...,
        description="Konten artikel dalam format markdown jika tersedia. Gunakan None jika konten tidak ada atau tidak relevan.",
    )


class NewsReportGenerator(Workflow):
    # Deskripsi ini hanya digunakan di antarmuka Workflow
    description: str = "Menghasilkan laporan berita komprehensif terhadap topik tertentu."

    # AGENT UNTUK PENCARIAN WEB
    web_searcher: Agent = Agent(
        model=OpenAIChat(id="gpt-4o-mini"),
        tools=[DuckDuckGo()],
        instructions=[
            "Diberikan sebuah topik, carilah 10 artikel dan kembalikan 5 artikel yang paling relevan.",
        ],
        response_model=SearchResults,
    )

    # AGENT UNTUK SCRAPING ARTIKEL
    article_scraper: Agent = Agent(
        model=OpenAIChat(id="gpt-4o-mini"),
        tools=[Newspaper4k()],
        instructions=[
            "Diberikan sebuah URL, lakukan scraping artikel tersebut. Kembalikan judul, url, dan konten berformat markdown.",
            "Jika konten tidak tersedia atau tidak masuk akal, kembalikan None untuk konten.",
        ],
        response_model=ScrapedArticle,
    )

    # AGENT UNTUK MENULIS LAPORAN (PENELITI HUKUM)
    writer: Agent = Agent(
        model=OpenAIChat(id="gpt-4o"),
        description=(
            "Anda adalah seorang Peneliti Hukum di Indonesia. "
            "Tugas Anda adalah melakukan tinjauan yuridis secara mendalam terhadap sebuah kasus "
            "berdasarkan sumber-sumber hukum yang relevan. "
            "Anda harus selalu menyertakan rujukan peraturan perundang-undangan, link sumber, "
            "dan dasar hukum resmi atau dokumen pendukung lainnya. "
            "Output yang dihasilkan harus memenuhi standar akademik atau keilmuan hukum."
        ),
        instructions=[
            "Anda akan diberikan artikel, referensi, atau informasi seputar kasus tertentu.",
            "Bacalah secara seksama untuk mendapatkan konteks hukum yang relevan di Indonesia.",
            "Selalu cantumkan pasal, undang-undang, peraturan pemerintah, atau yurisprudensi yang relevan.",
            "Tulis output dalam format analisis hukum yang baku dan sertakan sumber dengan link (jika ada) di bagian akhir.",
            "Usahakan untuk menggunakan istilah hukum yang tepat dan hindari spekulasi tanpa dasar hukum yang jelas."
        ],
        expected_output=dedent("""\
            <analysis_format>
            ## Pendahuluan
            {Jelaskan secara singkat konteks kasus dan alasan pentingnya ditinjau dari sisi hukum}

            ## Dasar Hukum
            {Rincikan peraturan perundang-undangan, pasal, yurisprudensi, atau doktrin hukum yang relevan. 
            Berikan penjelasan singkat mengenai mengapa pasal/UU tersebut berlaku.}

            ## Analisis Kasus
            {Analisis fakta-fakta yang relevan dengan ketentuan hukum di atas. 
            Jika perlu, sertakan pertimbangan yurisprudensi atau pendapat ahli lain (doktrin).}

            ## Kesimpulan
            {Simpulkan temuan atau analisis Anda. 
            Berikan keilmuan mu sesuai dengan dasar hukum.}

            ## Sumber
            - [Nama UU/Peraturan/Putusan/Website/Lembaga Terkait](URL)
            - [Nama Sumber Tambahan](URL)
            ... (tambahkan sesuai kebutuhan)
            </analysis_format>
        """),
    )

    def get_report_from_cache(self, topic: str) -> Optional[str]:
        """
        Memeriksa apakah laporan untuk suatu topik sudah ada di cache (session_state).
        """
        logger.info("Memeriksa apakah laporan sudah tersedia di cache")
        return self.session_state.get("reports", {}).get(topic)

    def add_report_to_cache(self, topic: str, report: Optional[str]):
        """
        Menyimpan laporan ke dalam cache (session_state).
        """
        logger.info(f"Menyimpan laporan untuk topik: {topic}")
        self.session_state.setdefault("reports", {})
        self.session_state["reports"][topic] = report

    def get_search_results(self, topic: str, use_search_cache: bool) -> Optional[SearchResults]:
        """
        Mendapatkan hasil pencarian artikel terkait topik. 
        Menggunakan cache jika diizinkan dan tersedia, 
        jika tidak, akan memanggil web_searcher.
        """
        search_results: Optional[SearchResults] = None

        # Mengambil hasil pencarian dari cache jika tersedia dan diperbolehkan
        if (
            use_search_cache
            and "search_results" in self.session_state
            and topic in self.session_state["search_results"]
        ):
            try:
                search_results = SearchResults.model_validate(self.session_state["search_results"][topic])
                logger.info(f"Ditemukan {len(search_results.articles)} artikel di cache.")
            except Exception as e:
                logger.warning(f"Tidak dapat membaca hasil pencarian dari cache: {e}")

        # Jika tidak ada di cache, lakukan pencarian baru
        if search_results is None:
            web_searcher_response: RunResponse = self.web_searcher.run(topic)
            if (
                web_searcher_response
                and web_searcher_response.content
                and isinstance(web_searcher_response.content, SearchResults)
            ):
                logger.info(f"WebSearcher menemukan {len(web_searcher_response.content.articles)} artikel.")
                search_results = web_searcher_response.content

        # Cache hasil pencarian jika ada
        if search_results is not None:
            if "search_results" not in self.session_state:
                self.session_state["search_results"] = {}
            self.session_state["search_results"][topic] = search_results.model_dump()

        return search_results

    def scrape_articles(self, search_results: SearchResults, use_scrape_cache: bool) -> Dict[str, ScrapedArticle]:
        """
        Melakukan scraping pada daftar artikel hasil pencarian.
        Jika use_scrape_cache True, akan menggunakan data scraping yang tersimpan jika ada.
        """
        scraped_articles: Dict[str, ScrapedArticle] = {}

        # Mengambil data scraping dari cache jika diizinkan dan tersedia
        if (
            use_scrape_cache
            and "scraped_articles" in self.session_state
            and isinstance(self.session_state["scraped_articles"], dict)
        ):
            for url, scraped_article in self.session_state["scraped_articles"].items():
                try:
                    validated_scraped_article = ScrapedArticle.model_validate(scraped_article)
                    scraped_articles[validated_scraped_article.url] = validated_scraped_article
                except Exception as e:
                    logger.warning(f"Tidak dapat membaca artikel scraping dari cache: {e}")
            logger.info(f"Ditemukan {len(scraped_articles)} artikel ter-scrape di cache.")

        # Melakukan scraping terhadap artikel yang belum ada di cache
        for article in search_results.articles:
            if article.url in scraped_articles:
                logger.info(f"Artikel sudah ada di cache: {article.url}")
                continue

            article_scraper_response: RunResponse = self.article_scraper.run(article.url)
            if (
                article_scraper_response
                and article_scraper_response.content
                and isinstance(article_scraper_response.content, ScrapedArticle)
            ):
                scraped_articles[article_scraper_response.content.url] = article_scraper_response.content
                logger.info(f"Berhasil scrape artikel: {article_scraper_response.content.url}")

        # Menyimpan hasil scraping ke session_state
        if "scraped_articles" not in self.session_state:
            self.session_state["scraped_articles"] = {}
        for url, scraped_article in scraped_articles.items():
            self.session_state["scraped_articles"][url] = scraped_article.model_dump()

        return scraped_articles

    def write_news_report(self, topic: str, scraped_articles: Dict[str, ScrapedArticle]) -> Iterator[RunResponse]:
        """
        Menggunakan agent `writer` untuk menyusun laporan akhir berdasarkan artikel yang telah di-scrape.
        """
        logger.info("Menulis laporan berita")
        # Mempersiapkan data yang akan diberikan ke agent writer
        writer_input = {"topic": topic, "articles": [v.model_dump() for v in scraped_articles.values()]}
        # Jalankan agent writer secara streaming
        yield from self.writer.run(json.dumps(writer_input, indent=4), stream=True)
        # Simpan laporan ke cache setelah selesai
        self.add_report_to_cache(topic, self.writer.run_response.content)

    def run(
        self, topic: str, use_search_cache: bool = True, use_scrape_cache: bool = True, use_cached_report: bool = True
    ) -> Iterator[RunResponse]:
        """
        Menghasilkan laporan berita komprehensif tentang suatu topik.

        Workflow ini melakukan langkah-langkah berikut:
        1. Memeriksa apakah sudah ada laporan yang di-cache jika use_cached_report True.
        2. Mencari artikel terkait topik:
           - Menggunakan cache hasil pencarian (search_results) jika diizinkan dan tersedia.
           - Jika tidak, memanggil agen web_searcher untuk pencarian baru.
        3. Melakukan scraping konten setiap artikel:
           - Menggunakan data scraped_articles dari cache jika diizinkan dan tersedia.
           - Jika tidak, melakukan scraping baru.
        4. Menggunakan konten artikel yang sudah di-scrape untuk menulis laporan akhir melalui agent `writer`.

        Args:
            topic (str): Topik yang akan dibahas dalam laporan.
            use_search_cache (bool, optional): Gunakan hasil pencarian yang sudah di-cache. Default True.
            use_scrape_cache (bool, optional): Gunakan hasil scraping yang sudah di-cache. Default True.
            use_cached_report (bool, optional): Gunakan laporan yang sudah di-cache untuk topik ini. Default True.

        Returns:
            Iterator[RunResponse]: Objek generator yang mengembalikan laporan atau status proses.
        """
        logger.info(f"Memulai pembuatan laporan untuk topik: {topic}")

        # Jika user mengizinkan penggunaan laporan yang sudah ada di cache
        if use_cached_report:
            cached_report = self.get_report_from_cache(topic)
            if cached_report:
                yield RunResponse(content=cached_report, event=RunEvent.workflow_completed)
                return

        # Mencari artikel berdasarkan topik
        search_results: Optional[SearchResults] = self.get_search_results(topic, use_search_cache)
        # Jika tidak ditemukan artikel sama sekali
        if search_results is None or len(search_results.articles) == 0:
            yield RunResponse(
                event=RunEvent.workflow_completed,
                content=f"Maaf, tidak menemukan artikel apa pun untuk topik: {topic}",
            )
            return

        # Melakukan scraping pada artikel-artikel tersebut
        scraped_articles: Dict[str, ScrapedArticle] = self.scrape_articles(search_results, use_scrape_cache)

        # Menulis laporan komprehensif
        yield from self.write_news_report(topic, scraped_articles)


# Jika file ini dieksekusi langsung (bukan diimport sebagai modul)
if __name__ == "__main__":
    from rich.prompt import Prompt

    # Meminta input topik dari pengguna
    topic = Prompt.ask(
        "[bold]Masukkan topik laporan berita[/bold]\n✨",
        default="IBM Hashicorp Acquisition",
    )

    # Membuat string yang aman untuk URL dari topik
    url_safe_topic = topic.lower().replace(" ", "-")

    # Inisialisasi workflow generator laporan berita
    generate_news_report = NewsReportGenerator(
        session_id=f"generate-report-on-{url_safe_topic}",
        storage=SqlWorkflowStorage(
            table_name="generate_news_report_workflows",
            db_file="tmp/workflows.db",
        ),
    )

    # Jalankan workflow dengan opsi cache diaktifkan
    report_stream: Iterator[RunResponse] = generate_news_report.run(
        topic=topic, use_search_cache=True, use_scrape_cache=True, use_cached_report=True
    )

    # Cetak hasil laporan
    pprint_run_response(report_stream, markdown=True)

sorry i have looking around, phidata already have fastapi built in

http://localhost:7777/docs

@isikepala Yes, templates offer an example demonstrating the use of a phidata Agent with FastAPI. Do let us know if you have any questions