In this tutorial, you’ll learn how to build an HTTP endpoint that is compatible with the AWP protocol.

Prerequisites

Make sure to have Python and Poetry installed.

Setup a New Project with Poetry

First, let’s create a new project and set up Poetry for dependency management:

poetry new my-endpoint && cd my-endpoint

Install Dependencies

Now, let’s install the necessary packages:

poetry add agentwire openai fastapi uvicorn

Create a Basic Endpoint with FastAPI

Create a new file called my_endpoint/main.py with the following code:

from fastapi import FastAPI, Request
import json
from agentwire.core.types import RunAgentInput

app = FastAPI(title="AWP Endpoint")

@app.post("/awp")
async def my_endpoint():
    return { "message": "Hello World" }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Run and Test Your Endpoint

Start the server with:

poetry run uvicorn my_endpoint.main:app --reload

In another terminal, test your endpoint is running using curl:

curl -X POST http://localhost:8000/awp

You should see the following response:

{ "message": "Hello World" }

Parsing AWP Input

Next let’s update our endpoint to properly parse the incoming AWP request using the RunAgentInput Pydantic model:

from fastapi import FastAPI, Request, HTTPException
from agentwire.core import RunAgentInput, Message

app = FastAPI(title="AWP Endpoint")

@app.post("/awp")
async def my_endpoint(input_data: RunAgentInput):
    thread_id = input_data.thread_id

    return { "message": "Hello World from " + thread_id }

FastAPI automatically validates the incoming request against the RunAgentInput schema. If the request doesn’t match the expected format, it will return a 422 Validation Error with details about what went wrong.

Add Event Streaming

AWP supports streaming events using Server-Sent Events (SSE). Let’s modify our /awp endpoint to stream events back to the client:

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from agentwire.core import RunAgentInput, Message, EventType, RunStartedEvent, RunFinishedEvent
from agentwire.encoder import EventEncoder

app = FastAPI(title="AWP Endpoint")

@app.post("/awp")
async def my_endpoint(input_data: RunAgentInput):
    async def event_generator():
        # Create an event encoder to properly format SSE events
        encoder = EventEncoder()

        # Send run started event
        yield encoder.encode(
          RunStartedEvent(
            type=EventType.RUN_STARTED,
            thread_id=input_data.thread_id,
            run_id=input_data.run_id
          )
        )

        # Send run finished event
        yield encoder.encode(
          RunFinishedEvent(
            type=EventType.RUN_FINISHED,
            thread_id=input_data.thread_id,
            run_id=input_data.run_id
          )
        )

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Awesome! We are already sending RunStartedEvent and RunFinishedEvent events, which gives us a basic AWP compliant endpoint. Now let’s make it do something useful.

Implementing Basic Chat

Let’s enhance our endpoint to call OpenAI’s API and stream the responses back as AWP events:

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from agentwire.core import (
  RunAgentInput,
  Message,
  EventType,
  RunStartedEvent,
  RunFinishedEvent,
  TextMessageStartEvent,
  TextMessageContentEvent,
  TextMessageEndEvent
)
from agentwire.encoder import EventEncoder
import uuid
from openai import OpenAI

app = FastAPI(title="AWP Endpoint")

@app.post("/awp")
async def my_endpoint(input_data: RunAgentInput):
    async def event_generator():
        # Create an event encoder to properly format SSE events
        encoder = EventEncoder()

        # Send run started event
        yield encoder.encode(
          RunStartedEvent(
            type=EventType.RUN_STARTED,
            thread_id=input_data.thread_id,
            run_id=input_data.run_id
          )
        )

        # Initialize OpenAI client
        client = OpenAI()

        # Generate a message ID for the assistant's response
        message_id = uuid.uuid4()

        # Send text message start event
        yield encoder.encode(
            TextMessageStartEvent(
                type=EventType.TEXT_MESSAGE_START,
                message_id=message_id,
                role="assistant"
            )
        )

        # Create a streaming completion request
        stream = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=openai_messages,
            stream=True
        )

        # Process the streaming response and send content events
        for chunk in stream:
            if hasattr(chunk.choices[0].delta, "content") and chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                yield encoder.encode(
                    TextMessageContentEvent(
                        type=EventType.TEXT_MESSAGE_CONTENT,
                        message_id=message_id,
                        delta=content
                    )
                )

        # Send text message end event
        yield encoder.encode(
            TextMessageEndEvent(
                type=EventType.TEXT_MESSAGE_END,
                message_id=message_id
            )
        )

        # Send run finished event
        yield encoder.encode(
          RunFinishedEvent(
            type=EventType.RUN_FINISHED,
            thread_id=input_data.thread_id,
            run_id=input_data.run_id
          )
        )

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

You’ll need to set your OpenAI API key as an environment variable and then restart the server:

export OPENAI_API_KEY=your-api-key
poetry run uvicorn my_endpoint.main:app --reload

This implementation creates a fully functional AWP endpoint that processes messages and streams back the responses in real-time.