- Python
- Node
Prerequisites
Make sure to have Python and Poetry installed.Setup a New Project with Poetry
First, let’s create a new project and set up Poetry for dependency management:Copy
poetry new my-endpoint && cd my-endpoint
Install Dependencies
Now, let’s install the necessary packages:Copy
poetry add ag-ui openai fastapi uvicorn
Create a Basic Endpoint with FastAPI
Create a new file calledmy_endpoint/main.py with the following code:Copy
from fastapi import FastAPI, Request
import json
from ag_ui.core.types import RunAgentInput
app = FastAPI(title="AG-UI Endpoint")
@app.post("/awp")
async def my_endpoint():
return { "message": "Hello World" }
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Run and Test Your Endpoint
Start the server with:Copy
poetry run uvicorn my_endpoint.main:app --reload
Copy
curl -X POST http://localhost:8000/awp
Copy
{ "message": "Hello World" }
Parsing AG-UI Input
Next let’s update our endpoint to properly parse the incoming AG-UI request using theRunAgentInput Pydantic model:Copy
from fastapi import FastAPI, Request, HTTPException
from ag_ui.core import RunAgentInput, Message
app = FastAPI(title="AG-UI Endpoint")
@app.post("/awp")
async def my_endpoint(input_data: RunAgentInput):
thread_id = input_data.thread_id
return { "message": "Hello World from " + thread_id }
Add Event Streaming
AG-UI supports streaming events using Server-Sent Events (SSE). Let’s modify our/awp endpoint to stream events back to the client:Copy
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from ag_ui.core import RunAgentInput, Message, EventType, RunStartedEvent, RunFinishedEvent
from ag_ui.encoder import EventEncoder
app = FastAPI(title="AG-UI Endpoint")
@app.post("/awp")
async def my_endpoint(input_data: RunAgentInput):
async def event_generator():
# Create an event encoder to properly format SSE events
encoder = EventEncoder()
# Send run started event
yield encoder.encode(
RunStartedEvent(
type=EventType.RUN_STARTED,
thread_id=input_data.thread_id,
run_id=input_data.run_id
)
)
# Send run finished event
yield encoder.encode(
RunFinishedEvent(
type=EventType.RUN_FINISHED,
thread_id=input_data.thread_id,
run_id=input_data.run_id
)
)
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
RunStartedEvent and RunFinishedEvent events,
which gives us a basic AG-UI compliant endpoint. Now let’s make it do something
useful.Implementing Basic Chat
Let’s enhance our endpoint to call OpenAI’s API and stream the responses back as AG-UI events:Copy
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from ag_ui.core import (
RunAgentInput,
Message,
EventType,
RunStartedEvent,
RunFinishedEvent,
TextMessageStartEvent,
TextMessageContentEvent,
TextMessageEndEvent
)
from ag_ui.encoder import EventEncoder
import uuid
from openai import OpenAI
app = FastAPI(title="AG-UI Endpoint")
@app.post("/awp")
async def my_endpoint(input_data: RunAgentInput):
async def event_generator():
# Create an event encoder to properly format SSE events
encoder = EventEncoder()
# Send run started event
yield encoder.encode(
RunStartedEvent(
type=EventType.RUN_STARTED,
thread_id=input_data.thread_id,
run_id=input_data.run_id
)
)
# Initialize OpenAI client
client = OpenAI()
# Generate a message ID for the assistant's response
message_id = uuid.uuid4()
# Send text message start event
yield encoder.encode(
TextMessageStartEvent(
type=EventType.TEXT_MESSAGE_START,
message_id=message_id,
role="assistant"
)
)
# Create a streaming completion request
stream = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=openai_messages,
stream=True
)
# Process the streaming response and send content events
for chunk in stream:
if hasattr(chunk.choices[0].delta, "content") and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
yield encoder.encode(
TextMessageContentEvent(
type=EventType.TEXT_MESSAGE_CONTENT,
message_id=message_id,
delta=content
)
)
# Send text message end event
yield encoder.encode(
TextMessageEndEvent(
type=EventType.TEXT_MESSAGE_END,
message_id=message_id
)
)
# Send run finished event
yield encoder.encode(
RunFinishedEvent(
type=EventType.RUN_FINISHED,
thread_id=input_data.thread_id,
run_id=input_data.run_id
)
)
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Copy
export OPENAI_API_KEY=your-api-key
poetry run uvicorn my_endpoint.main:app --reload
Prerequisites
Make sure to have Node.js (v16 or later) and npm or yarn installed.Setup a New Project
First, let’s create a new project and set up npm with TypeScript:Copy
mkdir awp-endpoint && cd awp-endpoint
npm init -y
npm install typescript ts-node @types/node @types/express --save-dev
npx tsc --init
Install Dependencies
Install the necessary packages:Copy
npm install express openai @ag-ui/core @ag-ui/encoder uuid
npm install @types/uuid --save-dev
Create a Basic Endpoint with Express
Create a new file calledsrc/server.ts with the following code:Copy
import express from "express"
import { Request, Response } from "express"
const app = express()
app.use(express.json())
app.post("/awp", (req: Request, res: Response) => {
res.json({ message: "Hello World" })
})
app.listen(8000, () => {
console.log("Server running on http://localhost:8000")
})
Run and Test Your Endpoint
Start the server with:Copy
npx ts-node src/server.ts
Copy
curl -X POST http://localhost:8000/awp
Copy
{ "message": "Hello World" }
Parsing AG-UI Input
Next let’s update our endpoint to properly parse the incoming AG-UI request using theRunAgentInput schema:Copy
import express, { Request, Response } from "express"
import { RunAgentInputSchema, RunAgentInput } from "@ag-ui/core"
const app = express()
app.use(express.json())
app.post("/awp", (req: Request, res: Response) => {
try {
// Parse and validate the request body
const input: RunAgentInput = RunAgentInputSchema.parse(req.body)
res.json({ message: `Hello World from ${input.threadId}` })
} catch (error) {
res.status(422).json({ error: (error as Error).message })
}
})
app.listen(8000, () => {
console.log("Server running on http://localhost:8000")
})
Add Event Streaming
AG-UI supports streaming events using Server-Sent Events (SSE). Let’s modify our/awp endpoint to stream events back to the client:Copy
import express, { Request, Response } from "express"
import { RunAgentInputSchema, RunAgentInput, EventType } from "@ag-ui/core"
import { EventEncoder } from "@ag-ui/encoder"
const app = express()
app.use(express.json())
app.post("/awp", async (req: Request, res: Response) => {
try {
// Parse and validate the request body
const input: RunAgentInput = RunAgentInputSchema.parse(req.body)
// Set up SSE headers
res.setHeader("Content-Type", "text/event-stream")
res.setHeader("Cache-Control", "no-cache")
res.setHeader("Connection", "keep-alive")
// Create an event encoder
const encoder = new EventEncoder()
// Send run started event
const runStarted = {
type: EventType.RUN_STARTED,
threadId: input.threadId,
runId: input.runId,
}
res.write(encoder.encode(runStarted))
// Send run finished event
const runFinished = {
type: EventType.RUN_FINISHED,
threadId: input.threadId,
runId: input.runId,
}
res.write(encoder.encode(runFinished))
// End the response
res.end()
} catch (error) {
res.status(422).json({ error: (error as Error).message })
}
})
app.listen(8000, () => {
console.log("Server running on http://localhost:8000")
})
Implementing Basic Chat
Let’s enhance our endpoint to call OpenAI’s API and stream the responses back as AG-UI events:Copy
import express, { Request, Response } from "express"
import {
RunAgentInputSchema,
RunAgentInput,
EventType,
Message,
} from "@ag-ui/core"
import { EventEncoder } from "@ag-ui/encoder"
import { OpenAI } from "openai"
import { v4 as uuidv4 } from "uuid"
const app = express()
app.use(express.json())
app.post("/awp", async (req: Request, res: Response) => {
try {
// Parse and validate the request body
const input: RunAgentInput = RunAgentInputSchema.parse(req.body)
// Set up SSE headers
res.setHeader("Content-Type", "text/event-stream")
res.setHeader("Cache-Control", "no-cache")
res.setHeader("Connection", "keep-alive")
// Create an event encoder
const encoder = new EventEncoder()
// Send run started event
const runStarted = {
type: EventType.RUN_STARTED,
threadId: input.threadId,
runId: input.runId,
}
res.write(encoder.encode(runStarted))
// Initialize OpenAI client
const client = new OpenAI()
// Convert AG-UI messages to OpenAI messages format
const openaiMessages = input.messages
.filter((msg: Message) =>
["user", "system", "assistant"].includes(msg.role)
)
.map((msg: Message) => ({
role: msg.role as "user" | "system" | "assistant",
content: msg.content || "",
}))
// Generate a message ID for the assistant's response
const messageId = uuidv4()
// Send text message start event
const textMessageStart = {
type: EventType.TEXT_MESSAGE_START,
messageId,
role: "assistant",
}
res.write(encoder.encode(textMessageStart))
// Create a streaming completion request
const stream = await client.chat.completions.create({
model: "gpt-3.5-turbo",
messages: openaiMessages,
stream: true,
})
// Process the streaming response and send content events
for await (const chunk of stream) {
if (chunk.choices[0]?.delta?.content) {
const content = chunk.choices[0].delta.content
const textMessageContent = {
type: EventType.TEXT_MESSAGE_CONTENT,
messageId,
delta: content,
}
res.write(encoder.encode(textMessageContent))
}
}
// Send text message end event
const textMessageEnd = {
type: EventType.TEXT_MESSAGE_END,
messageId,
}
res.write(encoder.encode(textMessageEnd))
// Send run finished event
const runFinished = {
type: EventType.RUN_FINISHED,
threadId: input.threadId,
runId: input.runId,
}
res.write(encoder.encode(runFinished))
// End the response
res.end()
} catch (error) {
res.status(422).json({ error: (error as Error).message })
}
})
app.listen(8000, () => {
console.log("Server running on http://localhost:8000")
})
Copy
export OPENAI_API_KEY=your-api-key
npx ts-node src/server.ts