Docs

Python Guide

How to call the Amdahl Platform API from Python 3.10+. There is no generated Python SDK yet; this guide walks you through a small, dependency-light wrapper you can drop into any project. A first-party package is on the roadmap.

Dependencies: requests for core calls, sseclient-py for the Server-Sent Events stream. Swap in httpx if you prefer async.

Setup

bash
pip install requests sseclient-py

Environment:

python
import os

AMDAHL_URL = os.environ["AMDAHL_URL"]
API_KEY = os.environ["API_KEY"]
BASE = f"{AMDAHL_URL}/api/platform/v1"

The client

A single AmdahlClient class wraps auth, JSON handling, and typed errors. Every example below calls through it.

python
from __future__ import annotations

import json
import time
from dataclasses import dataclass
from typing import Any, Iterator, Optional

import requests


class AmdahlError(Exception):
    def __init__(
        self,
        status: int,
        code: str,
        message: str,
        body: Any = None,
    ) -> None:
        super().__init__(f"[{status} {code}] {message}")
        self.status = status
        self.code = code
        self.body = body


class AmdahlClient:
    def __init__(
        self,
        base_url: str = BASE,
        api_key: str = API_KEY,
        timeout: float = 30.0,
    ) -> None:
        self.base_url = base_url.rstrip("/")
        self.session = requests.Session()
        self.session.headers.update(
            {
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json",
            }
        )
        self.timeout = timeout

    def request(
        self,
        method: str,
        path: str,
        *,
        json_body: Any = None,
        params: Optional[dict[str, Any]] = None,
    ) -> Any:
        url = f"{self.base_url}{path}"
        resp = self.session.request(
            method,
            url,
            json=json_body,
            params=params,
            timeout=self.timeout,
        )
        text = resp.text
        payload: Any = json.loads(text) if text else None
        if not resp.ok:
            err = (payload or {}).get("error", {}) if isinstance(payload, dict) else {}
            raise AmdahlError(
                status=resp.status_code,
                code=err.get("code", "unknown"),
                message=err.get("message", f"HTTP {resp.status_code}"),
                body=payload,
            )
        return payload

    # Shorthand verbs
    def get(self, path: str, **kwargs: Any) -> Any:
        return self.request("GET", path, **kwargs)

    def post(self, path: str, json_body: Any = None) -> Any:
        return self.request("POST", path, json_body=json_body)

    def patch(self, path: str, json_body: Any = None) -> Any:
        return self.request("PATCH", path, json_body=json_body)

    def delete(self, path: str) -> Any:
        return self.request("DELETE", path)

Data queries

data.query runs read-only SQL against your interactions table, auto-scoped to your business. It is a POST /data/query and requires data:read.

python
def query_data(client: AmdahlClient, sql: str) -> dict[str, Any]:
    return client.post("/data/query", json_body={"sql": sql})

Usage:

python
client = AmdahlClient()

result = query_data(
    client,
    "SELECT channel, COUNT(*) AS events FROM interactions "
    "GROUP BY channel ORDER BY events DESC LIMIT 5",
)
for row in result["rows"]:
    print(row["channel"], row["events"])

Pages

A Page is a workspace-authored data UI: a catalog-component spec plus named queries the host runs server-side. The CRUD verbs map onto the client below. create and update require pages:write; the reads require pages:read.

python
def list_pages(client: AmdahlClient) -> dict[str, Any]:
    return client.get("/pages")


def get_page(client: AmdahlClient, page_id: str) -> dict[str, Any]:
    return client.get(f"/pages/{page_id}")


def create_page(client: AmdahlClient, payload: dict[str, Any]) -> dict[str, Any]:
    return client.post("/pages", json_body=payload)


def update_page(client: AmdahlClient, page_id: str, patch: dict[str, Any]) -> dict[str, Any]:
    return client.patch(f"/pages/{page_id}", json_body=patch)


def archive_page(client: AmdahlClient, page_id: str) -> None:
    client.delete(f"/pages/{page_id}")

Usage:

python
client = AmdahlClient()

page = create_page(
    client,
    {
        "name": "Weekly sentiment",
        "tags": ["weekly", "sentiment"],
        "declared_queries": [
            {"name": "by_channel", "sql": "SELECT channel, COUNT(*) AS events FROM interactions GROUP BY channel"}
        ],
        "spec": {
            "type": "Section",
            "children": [{"type": "Table", "data": {"$query": "by_channel"}}],
        },
    },
)
print(page["id"])

Agents

Start, poll, resume, and a helper that blocks until a terminal (or paused) state.

python
TERMINAL = {"complete", "failed", "canceled"}


def start_agent(client: AmdahlClient, *, profile_id: str, task: str, input_params: Optional[dict[str, Any]] = None) -> dict[str, Any]:
    return client.post(
        "/agents/run",
        json_body={
            "profile_id": profile_id,
            "task": task,
            "input_params": input_params or {},
            "async": True,
        },
    )


def get_agent_status(client: AmdahlClient, session_id: str) -> dict[str, Any]:
    return client.get(f"/agents/{session_id}/status")


def resume_agent(
    client: AmdahlClient, session_id: str, input_value: dict[str, Any]
) -> dict[str, Any]:
    return client.post(
        f"/agents/{session_id}/resume", json_body={"input": input_value}
    )


def wait_for_agent(
    client: AmdahlClient,
    session_id: str,
    *,
    interval_s: float = 3.0,
    timeout_s: float = 600.0,
) -> dict[str, Any]:
    deadline = time.monotonic() + timeout_s
    while True:
        status = get_agent_status(client, session_id)
        if status["status"] in TERMINAL or status["status"] == "awaiting_input":
            return status
        if time.monotonic() > deadline:
            raise TimeoutError(f"agent {session_id} timed out")
        time.sleep(interval_s)

Stream progress (SSE)

sseclient-py parses the SSE framing; we open the request with stream=True.

python
from sseclient import SSEClient


def stream_agent(client: AmdahlClient, session_id: str) -> Iterator[dict[str, Any]]:
    # Use the underlying session to keep auth; sseclient needs the raw response.
    url = f"{client.base_url}/agents/{session_id}/stream"
    response = client.session.get(url, stream=True, timeout=None)
    response.raise_for_status()
    sse = SSEClient(response)
    for msg in sse.events():
        if not msg.data:
            continue
        yield json.loads(msg.data)

Usage:

python
for event in stream_agent(client, session_id):
    print(event["type"], event.get("data"))
    if event["type"] in {"session.completed", "session.failed"}:
        break

If you prefer httpx with streaming instead, the pattern is nearly identical:

python
import httpx

def stream_agent_httpx(session_id: str) -> Iterator[dict[str, Any]]:
    headers = {"Authorization": f"Bearer {API_KEY}"}
    with httpx.stream("GET", f"{BASE}/agents/{session_id}/stream", headers=headers, timeout=None) as r:
        buffer = ""
        for chunk in r.iter_text():
            buffer += chunk
            while "\n\n" in buffer:
                frame, buffer = buffer.split("\n\n", 1)
                for line in frame.splitlines():
                    if line.startswith("data: "):
                        yield json.loads(line[6:])

Webhooks

Register and manage

python
def register_webhook(
    client: AmdahlClient,
    *,
    url: str,
    events: list[str],
    description: Optional[str] = None,
) -> dict[str, Any]:
    payload: dict[str, Any] = {"url": url, "events": events}
    if description:
        payload["description"] = description
    return client.post("/webhooks", json_body=payload)


def list_webhooks(client: AmdahlClient) -> dict[str, Any]:
    return client.get("/webhooks")


def update_webhook(
    client: AmdahlClient, webhook_id: str, patch: dict[str, Any]
) -> dict[str, Any]:
    return client.patch(f"/webhooks/{webhook_id}", json_body=patch)


def delete_webhook(client: AmdahlClient, webhook_id: str) -> None:
    client.delete(f"/webhooks/{webhook_id}")

HMAC verification

Use inside your receiver (Flask, FastAPI, etc.). Pass in the raw request bytes.

python
import hashlib
import hmac


def verify_webhook_signature(raw_body: bytes, signature_header: Optional[str], secret: str) -> bool:
    if not signature_header:
        return False
    expected = hmac.new(secret.encode(), raw_body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, signature_header)

Minimal Flask receiver:

python
from flask import Flask, request, jsonify

app = Flask(__name__)
WEBHOOK_SECRET = os.environ["WEBHOOK_SECRET"]


@app.post("/hooks/amdahl")
def hook() -> Any:
    raw = request.get_data()
    sig = request.headers.get("X-Webhook-Signature")
    if not verify_webhook_signature(raw, sig, WEBHOOK_SECRET):
        return jsonify({"error": "invalid_signature"}), 401
    event = json.loads(raw)
    # Acknowledge fast, enqueue follow-up work.
    # Example: enqueue_task(event)
    return jsonify({"received": True}), 202

Full end-to-end pattern: recipes/react-to-webhook.md.

Error handling pattern

Branch on AmdahlError.code for machine-readable cases; log .body for anything unexpected.

python
try:
    start_agent(client, profile_id="content_writer", task="")
except AmdahlError as err:
    if err.code == "invalid_argument":
        print("bad input:", err)
    elif err.status == 429:
        retry_after = (err.body or {}).get("retry_after", 5)
        print(f"rate limited; retry in {retry_after}s")
    else:
        print("amdahl error:", err.status, err.code, err.body)

Full reference: api-reference/errors.md.

Retry helper

Retry 429 and 5xx with exponential backoff.

python
import random


def with_retry(fn, *, max_attempts: int = 4, base_s: float = 0.5):
    last: Exception | None = None
    for attempt in range(1, max_attempts + 1):
        try:
            return fn()
        except AmdahlError as err:
            last = err
            retryable = err.status == 429 or err.status >= 500
            if not retryable or attempt == max_attempts:
                raise
            delay = base_s * (2 ** (attempt - 1)) + random.random() * 0.2
            time.sleep(delay)
    if last:
        raise last

Pagination

Cursor-paginated list endpoints return next_cursor (null at the end). Wrap any of them in a generator like this; swap in the resource path and the array key it returns:

python
def iter_pages(client: AmdahlClient, items_key: str = "items") -> Iterator[dict[str, Any]]:
    cursor: Optional[str] = None
    while True:
        params: dict[str, Any] = {"limit": 100}
        if cursor:
            params["cursor"] = cursor
        page = client.get("/<list-endpoint>", params=params)
        for row in page.get(items_key, []):
            yield row
        cursor = page.get("next_cursor")
        if not cursor:
            return

Full details: pagination-errors.md.

Tips

  • Keep API_KEY server-side only. Never ship it to a browser or a mobile bundle.
  • Use requests.Session (we already do) so connections are pooled. Each AmdahlClient keeps one open.
  • Set a timeout on every call. The client defaults to 30s; streaming calls pass timeout=None deliberately because SSE holds the connection open.
  • Prefer async-start + polling or SSE over async=False. The 60s sync wait often times out.
  • Check response.headers for rate-limit metadata. X-RateLimit-Remaining and X-RateLimit-Reset are in the envelope Supabase-layer headers. See rate-limits.md.
  • Pin the API version. Send X-Amdahl-API-Version as described in versioning.md so a server upgrade never breaks your deploy.

See also