Python Guide
How to call the Amdahl Platform API from Python 3.10+. There is no generated Python SDK yet; this guide walks you through a small, dependency-light wrapper you can drop into any project. A first-party package is on the roadmap.
Dependencies: requests for core calls, sseclient-py for the Server-Sent Events stream. Swap in httpx if you prefer async.
Setup
pip install requests sseclient-pyEnvironment:
import os
AMDAHL_URL = os.environ["AMDAHL_URL"]
API_KEY = os.environ["API_KEY"]
BASE = f"{AMDAHL_URL}/api/platform/v1"The client
A single AmdahlClient class wraps auth, JSON handling, and typed errors. Every example below calls through it.
from __future__ import annotations
import json
import time
from dataclasses import dataclass
from typing import Any, Iterator, Optional
import requests
class AmdahlError(Exception):
def __init__(
self,
status: int,
code: str,
message: str,
body: Any = None,
) -> None:
super().__init__(f"[{status} {code}] {message}")
self.status = status
self.code = code
self.body = body
class AmdahlClient:
def __init__(
self,
base_url: str = BASE,
api_key: str = API_KEY,
timeout: float = 30.0,
) -> None:
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
)
self.timeout = timeout
def request(
self,
method: str,
path: str,
*,
json_body: Any = None,
params: Optional[dict[str, Any]] = None,
) -> Any:
url = f"{self.base_url}{path}"
resp = self.session.request(
method,
url,
json=json_body,
params=params,
timeout=self.timeout,
)
text = resp.text
payload: Any = json.loads(text) if text else None
if not resp.ok:
err = (payload or {}).get("error", {}) if isinstance(payload, dict) else {}
raise AmdahlError(
status=resp.status_code,
code=err.get("code", "unknown"),
message=err.get("message", f"HTTP {resp.status_code}"),
body=payload,
)
return payload
# Shorthand verbs
def get(self, path: str, **kwargs: Any) -> Any:
return self.request("GET", path, **kwargs)
def post(self, path: str, json_body: Any = None) -> Any:
return self.request("POST", path, json_body=json_body)
def patch(self, path: str, json_body: Any = None) -> Any:
return self.request("PATCH", path, json_body=json_body)
def delete(self, path: str) -> Any:
return self.request("DELETE", path)Data queries
data.query runs read-only SQL against your interactions table, auto-scoped to your business. It is a POST /data/query and requires data:read.
def query_data(client: AmdahlClient, sql: str) -> dict[str, Any]:
return client.post("/data/query", json_body={"sql": sql})Usage:
client = AmdahlClient()
result = query_data(
client,
"SELECT channel, COUNT(*) AS events FROM interactions "
"GROUP BY channel ORDER BY events DESC LIMIT 5",
)
for row in result["rows"]:
print(row["channel"], row["events"])Pages
A Page is a workspace-authored data UI: a catalog-component spec plus named queries the host runs server-side. The CRUD verbs map onto the client below. create and update require pages:write; the reads require pages:read.
def list_pages(client: AmdahlClient) -> dict[str, Any]:
return client.get("/pages")
def get_page(client: AmdahlClient, page_id: str) -> dict[str, Any]:
return client.get(f"/pages/{page_id}")
def create_page(client: AmdahlClient, payload: dict[str, Any]) -> dict[str, Any]:
return client.post("/pages", json_body=payload)
def update_page(client: AmdahlClient, page_id: str, patch: dict[str, Any]) -> dict[str, Any]:
return client.patch(f"/pages/{page_id}", json_body=patch)
def archive_page(client: AmdahlClient, page_id: str) -> None:
client.delete(f"/pages/{page_id}")Usage:
client = AmdahlClient()
page = create_page(
client,
{
"name": "Weekly sentiment",
"tags": ["weekly", "sentiment"],
"declared_queries": [
{"name": "by_channel", "sql": "SELECT channel, COUNT(*) AS events FROM interactions GROUP BY channel"}
],
"spec": {
"type": "Section",
"children": [{"type": "Table", "data": {"$query": "by_channel"}}],
},
},
)
print(page["id"])Agents
Start, poll, resume, and a helper that blocks until a terminal (or paused) state.
TERMINAL = {"complete", "failed", "canceled"}
def start_agent(client: AmdahlClient, *, profile_id: str, task: str, input_params: Optional[dict[str, Any]] = None) -> dict[str, Any]:
return client.post(
"/agents/run",
json_body={
"profile_id": profile_id,
"task": task,
"input_params": input_params or {},
"async": True,
},
)
def get_agent_status(client: AmdahlClient, session_id: str) -> dict[str, Any]:
return client.get(f"/agents/{session_id}/status")
def resume_agent(
client: AmdahlClient, session_id: str, input_value: dict[str, Any]
) -> dict[str, Any]:
return client.post(
f"/agents/{session_id}/resume", json_body={"input": input_value}
)
def wait_for_agent(
client: AmdahlClient,
session_id: str,
*,
interval_s: float = 3.0,
timeout_s: float = 600.0,
) -> dict[str, Any]:
deadline = time.monotonic() + timeout_s
while True:
status = get_agent_status(client, session_id)
if status["status"] in TERMINAL or status["status"] == "awaiting_input":
return status
if time.monotonic() > deadline:
raise TimeoutError(f"agent {session_id} timed out")
time.sleep(interval_s)Stream progress (SSE)
sseclient-py parses the SSE framing; we open the request with stream=True.
from sseclient import SSEClient
def stream_agent(client: AmdahlClient, session_id: str) -> Iterator[dict[str, Any]]:
# Use the underlying session to keep auth; sseclient needs the raw response.
url = f"{client.base_url}/agents/{session_id}/stream"
response = client.session.get(url, stream=True, timeout=None)
response.raise_for_status()
sse = SSEClient(response)
for msg in sse.events():
if not msg.data:
continue
yield json.loads(msg.data)Usage:
for event in stream_agent(client, session_id):
print(event["type"], event.get("data"))
if event["type"] in {"session.completed", "session.failed"}:
breakIf you prefer httpx with streaming instead, the pattern is nearly identical:
import httpx
def stream_agent_httpx(session_id: str) -> Iterator[dict[str, Any]]:
headers = {"Authorization": f"Bearer {API_KEY}"}
with httpx.stream("GET", f"{BASE}/agents/{session_id}/stream", headers=headers, timeout=None) as r:
buffer = ""
for chunk in r.iter_text():
buffer += chunk
while "\n\n" in buffer:
frame, buffer = buffer.split("\n\n", 1)
for line in frame.splitlines():
if line.startswith("data: "):
yield json.loads(line[6:])Webhooks
Register and manage
def register_webhook(
client: AmdahlClient,
*,
url: str,
events: list[str],
description: Optional[str] = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {"url": url, "events": events}
if description:
payload["description"] = description
return client.post("/webhooks", json_body=payload)
def list_webhooks(client: AmdahlClient) -> dict[str, Any]:
return client.get("/webhooks")
def update_webhook(
client: AmdahlClient, webhook_id: str, patch: dict[str, Any]
) -> dict[str, Any]:
return client.patch(f"/webhooks/{webhook_id}", json_body=patch)
def delete_webhook(client: AmdahlClient, webhook_id: str) -> None:
client.delete(f"/webhooks/{webhook_id}")HMAC verification
Use inside your receiver (Flask, FastAPI, etc.). Pass in the raw request bytes.
import hashlib
import hmac
def verify_webhook_signature(raw_body: bytes, signature_header: Optional[str], secret: str) -> bool:
if not signature_header:
return False
expected = hmac.new(secret.encode(), raw_body, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature_header)Minimal Flask receiver:
from flask import Flask, request, jsonify
app = Flask(__name__)
WEBHOOK_SECRET = os.environ["WEBHOOK_SECRET"]
@app.post("/hooks/amdahl")
def hook() -> Any:
raw = request.get_data()
sig = request.headers.get("X-Webhook-Signature")
if not verify_webhook_signature(raw, sig, WEBHOOK_SECRET):
return jsonify({"error": "invalid_signature"}), 401
event = json.loads(raw)
# Acknowledge fast, enqueue follow-up work.
# Example: enqueue_task(event)
return jsonify({"received": True}), 202Full end-to-end pattern: recipes/react-to-webhook.md.
Error handling pattern
Branch on AmdahlError.code for machine-readable cases; log .body for anything unexpected.
try:
start_agent(client, profile_id="content_writer", task="")
except AmdahlError as err:
if err.code == "invalid_argument":
print("bad input:", err)
elif err.status == 429:
retry_after = (err.body or {}).get("retry_after", 5)
print(f"rate limited; retry in {retry_after}s")
else:
print("amdahl error:", err.status, err.code, err.body)Full reference: api-reference/errors.md.
Retry helper
Retry 429 and 5xx with exponential backoff.
import random
def with_retry(fn, *, max_attempts: int = 4, base_s: float = 0.5):
last: Exception | None = None
for attempt in range(1, max_attempts + 1):
try:
return fn()
except AmdahlError as err:
last = err
retryable = err.status == 429 or err.status >= 500
if not retryable or attempt == max_attempts:
raise
delay = base_s * (2 ** (attempt - 1)) + random.random() * 0.2
time.sleep(delay)
if last:
raise lastPagination
Cursor-paginated list endpoints return next_cursor (null at the end). Wrap any of them in a generator like this; swap in the resource path and the array key it returns:
def iter_pages(client: AmdahlClient, items_key: str = "items") -> Iterator[dict[str, Any]]:
cursor: Optional[str] = None
while True:
params: dict[str, Any] = {"limit": 100}
if cursor:
params["cursor"] = cursor
page = client.get("/<list-endpoint>", params=params)
for row in page.get(items_key, []):
yield row
cursor = page.get("next_cursor")
if not cursor:
returnFull details: pagination-errors.md.
Tips
- Keep
API_KEYserver-side only. Never ship it to a browser or a mobile bundle. - Use
requests.Session(we already do) so connections are pooled. EachAmdahlClientkeeps one open. - Set a timeout on every call. The client defaults to 30s; streaming calls pass
timeout=Nonedeliberately because SSE holds the connection open. - Prefer async-start + polling or SSE over
async=False. The 60s sync wait often times out. - Check
response.headersfor rate-limit metadata.X-RateLimit-RemainingandX-RateLimit-Resetare in the envelope Supabase-layer headers. See rate-limits.md. - Pin the API version. Send
X-Amdahl-API-Versionas described in versioning.md so a server upgrade never breaks your deploy.
See also
- quickstart.md for a copy-paste first call.
- sdk/curl.md for the raw HTTP equivalent.
- sdk/typescript.md for the Node.js equivalent.
- recipes/run-research.md for an end-to-end research flow using these helpers.
- api-reference/README.md for the full endpoint catalog.