Hello, connector
This tutorial walks you end-to-end through the work of adding a new data source to AiSOC. By the end you will have:
- A subclass of
BaseConnectorthat implements every method the platform calls. - A self-describing schema that the connector wizard renders into a configuration form.
- A
test_connection()path that the wizard hits before it lets you save. - A
fetch_alerts()path that the polling scheduler hits every five minutes. - A
normalize()step that converts vendor JSON into AiSOC's common alert shape. - A smoke test that pins all of the above against mocked HTTP traffic.
Everything points at httpbin.org, a free, no-auth HTTP testing service. The point isn't to ingest httpbin events into your SOC — the point is to walk every line of the connector contract against a backend that:
- responds without authentication,
- returns deterministic JSON shapes you can assert on,
- has been up roughly forever.
When you're ready to wire up a real vendor, copy the example, swap the URLs, and add an authentication step. The shape stays the same.
Where the example lives
The reference implementation lives at:
services/connectors/app/connectors/_examples/hello_connector.py
The _examples/ directory is deliberately not registered in services/connectors/app/connectors/__init__.py. Anything under _examples/ exists for documentation only and never appears in the connector catalog, the polling scheduler, or the marketplace index. There is a smoke test (tests/test_hello_connector_example.py) that pins this invariant — if someone accidentally promotes the example into the live registry, CI fails.
Step 1 — Pick identity attributes
A connector identifies itself with three class-level strings:
class HelloConnector(BaseConnector):
connector_id = "hello"
connector_name = "Hello (Tutorial)"
connector_category = "saas"
connector_idis the wire identifier. It shows up in database rows, plugin manifests, and API URLs. Treat it like a primary key: lowercase, hyphen- or underscore-separated, never changes after first release.connector_nameis the human label rendered in the catalog grid.connector_categorymust be one of the existing categories:identity,cloud,vcs,siem,edr,xdr,network,posture,saas. The category drives grouping in the wizard and downstream routing hints in detection rules. If your tool genuinely doesn't fit one of these, raise it in a PR before inventing a new one — the taxonomy is shared with the detection layer.
Step 2 — Declare the schema
schema() is what the connector wizard renders. It tells the UI which fields to draw, which ones are secret, and where to find the docs page (which is this page, in our case).
@classmethod
def schema(cls) -> ConnectorSchema:
return ConnectorSchema(
connector_id=cls.connector_id,
connector_name=cls.connector_name,
category=cls.connector_category,
description=(
"Tutorial connector that polls httpbin.org. Use this as a "
"reference when writing a new connector — it implements "
"every BaseConnector method against a public, no-auth API."
),
docs_url="/docs/connectors/hello-connector",
fields=[
Field(
"api_token",
"secret",
"API Token",
help_text=(
"Pretend credential. httpbin doesn't actually "
"validate it — this field exists so the tutorial "
"exercises the credential vault."
),
),
Field(
"base_url",
"string",
"Base URL",
required=False,
default="https://httpbin.org",
placeholder="https://httpbin.org",
help_text="Override only when running against a self-hosted httpbin.",
),
],
)
A few things worth pointing at:
type="secret"onapi_tokendoes two things: the wizard renders a masked input, and the credential vault encrypts the value withFernetbefore it touches the database. The tutorial declares the field as a secret even though httpbin ignores it — the goal is to exercise that code path so you can see exactly what happens with a real vendor.required=Falseplusdefault=...onbase_urlmatches a common pattern for self-hosted vendor support. The wizard shows the default in placeholder text and skips validation if the field is left blank.docs_urlis the link the wizard surfaces as "Documentation" in the connector card. Always point it at the per-connector page, not the catalog.
Step 3 — Declare capabilities
Capabilities are the set of action verbs the agent layer is allowed to ask the connector to perform. The tutorial connector is read-only, so it declares one:
@classmethod
def capabilities(cls) -> tuple[Capability, ...]:
return (Capability.PULL_ALERTS,)
Real connectors will frequently add things like PULL_AUDIT, PIVOT_USER, or READ_AUDIT_TRAIL. Anything kinetic (isolating a host, blocking a hash, deleting a session) requires a real backend that can act on those verbs and is out of scope for this example by design — the goal is to not give a brand-new contributor a half-implemented "isolate host" path that they then ship by mistake.
Step 4 — Constructor
The constructor must match the schema field names exactly. The router decrypts auth_config and unpacks it as keyword arguments, so a typo here turns into a 500 at poll time:
def __init__(self, api_token: str, base_url: str | None = None):
self._api_token = api_token
# Strip trailing slash so we can compose URLs with simple `+`.
self._base_url = (base_url or "https://httpbin.org").rstrip("/")
We strip trailing slashes so URL composition stays boring (f"{self._base_url}/anything" always works).
Step 5 — test_connection()
test_connection() is the pre-save liveness check. It is called by POST /connectors/test before the row is written to the database. Returning success: false here blocks the save, so keep this cheap and deterministic — one round trip, short timeout, no pagination:
async def test_connection(self) -> dict[str, Any]:
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(f"{self._base_url}/status/200")
except httpx.HTTPError as exc:
return {
"success": False,
"connector": self.connector_id,
"error": f"network error: {exc.__class__.__name__}",
}
if resp.status_code != 200:
return {
"success": False,
"connector": self.connector_id,
"error": f"unexpected status: HTTP {resp.status_code}",
}
return {"success": True, "connector": self.connector_id}
Three rules to internalize for test_connection():
- Always return a dict. Never raise. The wizard uses the
successflag to decide whether to render the error inline; an unhandled exception bubbles up as a 500 and the operator sees a useless "request failed" toast. - Always include
connector: self.connector_id. Multiple connectors can be tested in parallel; the response key lets the UI route the result to the right card. - Categorise failures.
network error: ConnectErroris a different problem fromunexpected status: HTTP 401. The first is an infrastructure issue; the second is a credential issue. Surface the difference so the operator knows whether to call IT or the vendor.
Step 6 — fetch_alerts()
This is the polling read path. The scheduler calls it every poll_interval_seconds (default 300) with the elapsed window since the last successful poll. Real connectors translate since_seconds into a vendor-native time filter (Auth0 uses a Lucene query string, Cloudflare uses since=<ISO-8601>, GitHub uses cursor pagination, etc.).
httpbin doesn't store events, so the example just hits /anything once and treats the response body as a single synthetic event:
async def fetch_alerts(self, since_seconds: int = 300) -> list[dict[str, Any]]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(
f"{self._base_url}/anything",
params={"since_seconds": since_seconds},
)
except httpx.HTTPError as exc:
logger.warning("hello.fetch_alerts.network_error", error=str(exc))
return []
if resp.status_code != 200:
logger.warning(
"hello.fetch_alerts.bad_status",
status=resp.status_code,
body=resp.text[:300],
)
return []
try:
payload = resp.json() or {}
except ValueError:
logger.warning("hello.fetch_alerts.invalid_json")
return []
return [self.normalize(payload)]
The most important thing here is the failure model: polling failures must be non-terminal. Network errors, 5xx responses, and bad JSON all return [] rather than raising. The scheduler logs the failure, records it on health_status, surfaces it in the UI on the connector card, and tries again next interval. If fetch_alerts() raises, the entire job slot is poisoned — you'll see the connector go red until the next reload (up to 30 seconds), and any other connectors sharing that thread will be delayed.
Keep the since_seconds parameter in the signature even if your vendor doesn't expose a time-window filter — the scheduler always passes it.
Step 7 — normalize()
normalize() maps vendor JSON to AiSOC's common alert shape:
def normalize(self, raw: dict[str, Any]) -> dict[str, Any]:
url = raw.get("url") or "(no url)"
return {
"source": "hello",
"category": self.connector_category,
"severity": "info",
"title": f"Hello connector: pinged {url}",
"description": "Synthetic event from the hello-connector tutorial.",
"alert_id": f"hello:{raw.get('origin', '?')}:{url}",
"host": None,
"raw": raw,
}
The contract for the returned dict is documented in services/ingest and includes (at minimum):
| Field | Notes |
|---|---|
source | Stable string, almost always cls.connector_id. |
category | Same taxonomy as the schema. Detection rules match against this. |
severity | One of info, low, medium, high. Four tiers, not five. Vendor 5-tier ladders (Azure, SCC, etc.) collapse here. |
title | Short, human-readable. Operators read this in a list. |
description | Optional context. Can be the raw event description, the actor, the IP, etc. |
alert_id | Stable identifier for de-duplication. Use the vendor's ID when one exists; otherwise build a deterministic hash from event content. |
host | Hostname or IP if relevant; None if not. |
raw | Always include the original payload. Detection rules and the explain endpoint both rely on it. |
A few patterns worth copying:
severityis hard-codedinfoin the example because httpbin doesn't carry a severity signal. Real connectors derive severity from the vendor's risk score, status code, anomaly flag, etc. — see the Auth0 connector for an example of a simple rule-based mapping.alert_idshould be stable across re-fetches when the vendor exposes a unique ID. httpbin doesn't, so the example cheats: it usesorigin + urlas a poor-but-stable hash. If you do this, document it — the next person to look will assume you forgot.rawis forensics. Don't summarise it, don't truncate it, don't drop fields you didn't recognise. The detection layer and the explain endpoint both treat it as the source of truth for "what actually happened".
Step 8 — Pin the contract with a smoke test
The example is paired with a smoke test at services/connectors/tests/test_hello_connector_example.py. It uses respx (already a dev dependency in services/connectors/pyproject.toml) to mock httpx and exercise every method end-to-end:
@respx.mock
@pytest.mark.asyncio
async def test_test_connection_success():
respx.get("https://httpbin.org/status/200").mock(return_value=httpx.Response(200))
conn = HelloConnector(api_token="t")
result = await conn.test_connection()
assert result == {"success": True, "connector": "hello"}
The test file pins three different things, in order of importance:
HelloConnectoris not in the registry. This catches the "I copied the example into the catalog by accident" failure mode.- The schema shape doesn't drift. Field names, secret flags, default values, and the docs URL are all asserted explicitly. If a tutorial reader copies the example and changes one of these without thinking, the test tells them.
- Every method behaves the way the tutorial says it does. Happy path, bad status, network error, invalid JSON, and the
since_secondsquery parameter all have dedicated cases.
When you write your own connector, copy the test layout. The volume isn't large — fifteen short tests cover the entire surface — and the cost of not having them is that you ship a connector that silently regresses six months later when a vendor changes their JSON shape.
Run the suite locally:
cd services/connectors
.venv/bin/python -m pytest tests/test_hello_connector_example.py -v
You should see 15 tests pass in well under a second.
Graduating from _examples/ to a real connector
When you're ready to ship a real connector, here is the exact checklist:
- Move the file. From
services/connectors/app/connectors/_examples/<your_connector>.pytoservices/connectors/app/connectors/<your_connector>.py. - Register it. Add the class to
_CONNECTOR_CLASSESinservices/connectors/app/connectors/__init__.py. Keep the tuple alphabetised byconnector_idto keep diffs predictable. Add the class name to the__all__list at the bottom of the same file. - Add a marketplace manifest. Create
plugins/<connector-id>/plugin.yamlmirroring yourschema(). The fields and secret flags must match. Useplugins/auth0/plugin.yamlas a template. - Sync the marketplace. Run
pnpm marketplace:syncfrom the repo root. This regenerates the static catalog underapps/web/public/marketplace/so the catalog grid in the UI picks up the new entry. - Write a docs page. Add
apps/docs/docs/connectors/<your-connector>.mdand wire it intoapps/docs/sidebars.tsunder theConnectorscategory. Useapps/docs/docs/connectors/cloudflare.mdas a template —What you gettable →Prerequisites→Setup walkthrough→Polling details→Severity heuristics→Troubleshooting. - Add real tests. Use
respxto mock the vendor API. At minimum, exercisetest_connection()(success + auth failure + network failure),fetch_alerts()(happy path + empty + 5xx + auth expiry), andnormalize()(every severity branch). - Open a PR. The code review will check that schema fields are wizard-renderable, that secrets are marked
secret=True, that the category fits the existing taxonomy, that polling failures are non-terminal, and that the docs page exists.
That's the full path. If any of those steps feels heavier than you expected, file a bug — the goal is for adding a connector to feel mechanical, not heroic.
Related
- Connectors overview — the catalog, polling architecture, and category taxonomy.
- Credential vault — what
secret=Trueactually does at the database layer. - Plugin SDK overview — for connectors distributed outside the monorepo.
- Contributing guidelines — broader expectations for PRs that touch
services/connectors/.