# Python

# Python Guide

How to call the Amdahl Platform API from Python 3.10+. There is no generated Python SDK yet; this guide walks you through a small, dependency-light wrapper you can drop into any project. A first-party package is on the roadmap.

Dependencies: `requests` for core calls, `sseclient-py` for the Server-Sent Events stream. Swap in `httpx` if you prefer async.

## Setup

```bash
pip install requests sseclient-py
```

Environment:

```python
import os

AMDAHL_URL = os.environ["AMDAHL_URL"]
API_KEY = os.environ["API_KEY"]
BASE = f"{AMDAHL_URL}/api/platform/v1"
```

## The client

A single `AmdahlClient` class wraps auth, JSON handling, and typed errors. Every example below calls through it.

```python
from __future__ import annotations

import json
import time
from dataclasses import dataclass
from typing import Any, Iterator, Optional

import requests


class AmdahlError(Exception):
    def __init__(
        self,
        status: int,
        code: str,
        message: str,
        body: Any = None,
    ) -> None:
        super().__init__(f"[{status} {code}] {message}")
        self.status = status
        self.code = code
        self.body = body


class AmdahlClient:
    def __init__(
        self,
        base_url: str = BASE,
        api_key: str = API_KEY,
        timeout: float = 30.0,
    ) -> None:
        self.base_url = base_url.rstrip("/")
        self.session = requests.Session()
        self.session.headers.update(
            {
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json",
            }
        )
        self.timeout = timeout

    def request(
        self,
        method: str,
        path: str,
        *,
        json_body: Any = None,
        params: Optional[dict[str, Any]] = None,
    ) -> Any:
        url = f"{self.base_url}{path}"
        resp = self.session.request(
            method,
            url,
            json=json_body,
            params=params,
            timeout=self.timeout,
        )
        text = resp.text
        payload: Any = json.loads(text) if text else None
        if not resp.ok:
            err = (payload or {}).get("error", {}) if isinstance(payload, dict) else {}
            raise AmdahlError(
                status=resp.status_code,
                code=err.get("code", "unknown"),
                message=err.get("message", f"HTTP {resp.status_code}"),
                body=payload,
            )
        return payload

    # Shorthand verbs
    def get(self, path: str, **kwargs: Any) -> Any:
        return self.request("GET", path, **kwargs)

    def post(self, path: str, json_body: Any = None) -> Any:
        return self.request("POST", path, json_body=json_body)

    def patch(self, path: str, json_body: Any = None) -> Any:
        return self.request("PATCH", path, json_body=json_body)

    def delete(self, path: str) -> Any:
        return self.request("DELETE", path)
```

## Data queries

`data.query` runs read-only SQL against your `interactions` table, auto-scoped to your business. It is a `POST /data/query` and requires `data:read`.

```python
def query_data(client: AmdahlClient, sql: str) -> dict[str, Any]:
    return client.post("/data/query", json_body={"sql": sql})
```

Usage:

```python
client = AmdahlClient()

result = query_data(
    client,
    "SELECT channel, COUNT(*) AS events FROM interactions "
    "GROUP BY channel ORDER BY events DESC LIMIT 5",
)
for row in result["rows"]:
    print(row["channel"], row["events"])
```

## Pages

A Page is a workspace-authored data UI: a catalog-component spec plus named queries the host runs server-side. The CRUD verbs map onto the client below. `create` and `update` require `pages:write`; the reads require `pages:read`.

```python
def list_pages(client: AmdahlClient) -> dict[str, Any]:
    return client.get("/pages")


def get_page(client: AmdahlClient, page_id: str) -> dict[str, Any]:
    return client.get(f"/pages/{page_id}")


def create_page(client: AmdahlClient, payload: dict[str, Any]) -> dict[str, Any]:
    return client.post("/pages", json_body=payload)


def update_page(client: AmdahlClient, page_id: str, patch: dict[str, Any]) -> dict[str, Any]:
    return client.patch(f"/pages/{page_id}", json_body=patch)


def archive_page(client: AmdahlClient, page_id: str) -> None:
    client.delete(f"/pages/{page_id}")
```

Usage:

```python
client = AmdahlClient()

page = create_page(
    client,
    {
        "name": "Weekly sentiment",
        "tags": ["weekly", "sentiment"],
        "declared_queries": [
            {"name": "by_channel", "sql": "SELECT channel, COUNT(*) AS events FROM interactions GROUP BY channel"}
        ],
        "spec": {
            "type": "Section",
            "children": [{"type": "Table", "data": {"$query": "by_channel"}}],
        },
    },
)
print(page["id"])
```

## Agents

Start, poll, resume, and a helper that blocks until a terminal (or paused) state.

```python
TERMINAL = {"complete", "failed", "canceled"}


def start_agent(client: AmdahlClient, *, profile_id: str, task: str, input_params: Optional[dict[str, Any]] = None) -> dict[str, Any]:
    return client.post(
        "/agents/run",
        json_body={
            "profile_id": profile_id,
            "task": task,
            "input_params": input_params or {},
            "async": True,
        },
    )


def get_agent_status(client: AmdahlClient, session_id: str) -> dict[str, Any]:
    return client.get(f"/agents/{session_id}/status")


def resume_agent(
    client: AmdahlClient, session_id: str, input_value: dict[str, Any]
) -> dict[str, Any]:
    return client.post(
        f"/agents/{session_id}/resume", json_body={"input": input_value}
    )


def wait_for_agent(
    client: AmdahlClient,
    session_id: str,
    *,
    interval_s: float = 3.0,
    timeout_s: float = 600.0,
) -> dict[str, Any]:
    deadline = time.monotonic() + timeout_s
    while True:
        status = get_agent_status(client, session_id)
        if status["status"] in TERMINAL or status["status"] == "awaiting_input":
            return status
        if time.monotonic() > deadline:
            raise TimeoutError(f"agent {session_id} timed out")
        time.sleep(interval_s)
```

### Stream progress (SSE)

`sseclient-py` parses the SSE framing; we open the request with `stream=True`.

```python
from sseclient import SSEClient


def stream_agent(client: AmdahlClient, session_id: str) -> Iterator[dict[str, Any]]:
    # Use the underlying session to keep auth; sseclient needs the raw response.
    url = f"{client.base_url}/agents/{session_id}/stream"
    response = client.session.get(url, stream=True, timeout=None)
    response.raise_for_status()
    sse = SSEClient(response)
    for msg in sse.events():
        if not msg.data:
            continue
        yield json.loads(msg.data)
```

Usage:

```python
for event in stream_agent(client, session_id):
    print(event["type"], event.get("data"))
    if event["type"] in {"session.completed", "session.failed"}:
        break
```

If you prefer `httpx` with streaming instead, the pattern is nearly identical:

```python
import httpx

def stream_agent_httpx(session_id: str) -> Iterator[dict[str, Any]]:
    headers = {"Authorization": f"Bearer {API_KEY}"}
    with httpx.stream("GET", f"{BASE}/agents/{session_id}/stream", headers=headers, timeout=None) as r:
        buffer = ""
        for chunk in r.iter_text():
            buffer += chunk
            while "\n\n" in buffer:
                frame, buffer = buffer.split("\n\n", 1)
                for line in frame.splitlines():
                    if line.startswith("data: "):
                        yield json.loads(line[6:])
```

## Webhooks

### Register and manage

```python
def register_webhook(
    client: AmdahlClient,
    *,
    url: str,
    events: list[str],
    description: Optional[str] = None,
) -> dict[str, Any]:
    payload: dict[str, Any] = {"url": url, "events": events}
    if description:
        payload["description"] = description
    return client.post("/webhooks", json_body=payload)


def list_webhooks(client: AmdahlClient) -> dict[str, Any]:
    return client.get("/webhooks")


def update_webhook(
    client: AmdahlClient, webhook_id: str, patch: dict[str, Any]
) -> dict[str, Any]:
    return client.patch(f"/webhooks/{webhook_id}", json_body=patch)


def delete_webhook(client: AmdahlClient, webhook_id: str) -> None:
    client.delete(f"/webhooks/{webhook_id}")
```

### HMAC verification

Use inside your receiver (Flask, FastAPI, etc.). Pass in the raw request bytes.

```python
import hashlib
import hmac


def verify_webhook_signature(raw_body: bytes, signature_header: Optional[str], secret: str) -> bool:
    if not signature_header:
        return False
    expected = hmac.new(secret.encode(), raw_body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, signature_header)
```

Minimal Flask receiver:

```python
from flask import Flask, request, jsonify

app = Flask(__name__)
WEBHOOK_SECRET = os.environ["WEBHOOK_SECRET"]


@app.post("/hooks/amdahl")
def hook() -> Any:
    raw = request.get_data()
    sig = request.headers.get("X-Webhook-Signature")
    if not verify_webhook_signature(raw, sig, WEBHOOK_SECRET):
        return jsonify({"error": "invalid_signature"}), 401
    event = json.loads(raw)
    # Acknowledge fast, enqueue follow-up work.
    # Example: enqueue_task(event)
    return jsonify({"received": True}), 202
```

Full end-to-end pattern: [recipes/react-to-webhook.md](../recipes/react-to-webhook.md).

## Error handling pattern

Branch on `AmdahlError.code` for machine-readable cases; log `.body` for anything unexpected.

```python
try:
    start_agent(client, profile_id="content_writer", task="")
except AmdahlError as err:
    if err.code == "invalid_argument":
        print("bad input:", err)
    elif err.status == 429:
        retry_after = (err.body or {}).get("retry_after", 5)
        print(f"rate limited; retry in {retry_after}s")
    else:
        print("amdahl error:", err.status, err.code, err.body)
```

Full reference: [api-reference/errors.md](../api-reference/errors.md).

## Retry helper

Retry 429 and 5xx with exponential backoff.

```python
import random


def with_retry(fn, *, max_attempts: int = 4, base_s: float = 0.5):
    last: Exception | None = None
    for attempt in range(1, max_attempts + 1):
        try:
            return fn()
        except AmdahlError as err:
            last = err
            retryable = err.status == 429 or err.status >= 500
            if not retryable or attempt == max_attempts:
                raise
            delay = base_s * (2 ** (attempt - 1)) + random.random() * 0.2
            time.sleep(delay)
    if last:
        raise last
```

## Pagination

Cursor-paginated list endpoints return `next_cursor` (null at the end). Wrap any of them in a generator like this; swap in the resource path and the array key it returns:

```python
def iter_pages(client: AmdahlClient, items_key: str = "items") -> Iterator[dict[str, Any]]:
    cursor: Optional[str] = None
    while True:
        params: dict[str, Any] = {"limit": 100}
        if cursor:
            params["cursor"] = cursor
        page = client.get("/<list-endpoint>", params=params)
        for row in page.get(items_key, []):
            yield row
        cursor = page.get("next_cursor")
        if not cursor:
            return
```

Full details: [pagination-errors.md](../pagination-errors.md).

## Tips

- **Keep `API_KEY` server-side only.** Never ship it to a browser or a mobile bundle.
- **Use `requests.Session` (we already do)** so connections are pooled. Each `AmdahlClient` keeps one open.
- **Set a timeout on every call.** The client defaults to 30s; streaming calls pass `timeout=None` deliberately because SSE holds the connection open.
- **Prefer async-start + polling or SSE** over `async=False`. The 60s sync wait often times out.
- **Check `response.headers` for rate-limit metadata.** `X-RateLimit-Remaining` and `X-RateLimit-Reset` are in the envelope Supabase-layer headers. See [rate-limits.md](../rate-limits.md).
- **Pin the API version.** Send `X-Amdahl-API-Version` as described in [versioning.md](../versioning.md) so a server upgrade never breaks your deploy.

## See also

- [quickstart.md](../quickstart.md) for a copy-paste first call.
- [sdk/curl.md](./curl.md) for the raw HTTP equivalent.
- [sdk/typescript.md](./typescript.md) for the Node.js equivalent.
- [recipes/run-research.md](../recipes/run-research.md) for an end-to-end research flow using these helpers.
- [api-reference/README.md](../api-reference/README.md) for the full endpoint catalog.
