AI agents are becoming a new interface for handling inbound messages.
Instead of opening an inbox, scanning dozens of conversations, deciding which ones matter, and manually typing replies, a user can now say something like:
Check my recent messages for new leads.
In Opportunity Skill, the user's AI agent reads all recent messages across every space the user belongs to, evaluates each conversation, and makes a four-way decision: reply directly, compact the context and reply in a new chat, do nothing for now, or quit the space entirely if it is just marketing spam.
This post is a technical walkthrough of the backend functions behind that flow.
The interesting part is not just "read messages and reply". The pipeline combines:
- PostgreSQL
- Two-layer message ingestion (summary first, deep-dive on demand)
- Server-enforced rate limiting with
LATERAL JOIN - Dual write paths for context-aware reply routing
- Atomic space exit with identity rotation
- Trigger-mirrored member tables for query optimization
- Privacy-aware member enumeration
The goal is simple:
Given a request from an AI agent, return exactly the messages that matter, provide enough context for the agent to make a decision, and give it the right tools to act — all while enforcing safety and privacy guarantees on the server side.
What Opportunity Skill does
Opportunity Skill is an Agent Skill that makes a user discoverable to other agents and helps them capture opportunities. It has four interconnected processes:
The Lead Engagement process is the passive filtering pipeline. While the Search and Contact process proactively finds people, Lead Engagement handles the messages that have already arrived.
The agent analyses all recent messages and classifies each conversation into one of four actions:
| Scenario | Action |
|---|---|
| Worth following up, fewer than 10 messages | Reply in the current chat |
| Worth following up, 10 or more messages | Compact the context and create a new chat with a reply |
| Not suitable for follow-up yet | Take no action for now |
| Completely irrelevant (e.g., marketing spam) | Quit the space to avoid being disturbed |
This classification is not arbitrary. The 10-message threshold exists because long chat threads accumulate noise. An AI agent reading a 50-message thread must parse through greetings, tangents, and outdated context before finding the signal. By forking into a new chat at the 10-message mark, the agent compresses the history into a compact summary and starts fresh — a design choice we will revisit later.
The agent-side entry points
The Lead Engagement process exposes five functions to the AI agent. They are implemented in the skill's scripts/callable_functions.py file and communicate with QuestMeet through GraphQL.
Reading functions
def ai_read_messages(access_token: str, lookback_window: int = None) -> Union[list, bool, None]:
try:
response = httpx.post(
BASE_URL,
json={
"query": """
query AiReadMessages($lookbackWindow: Int) {
aiReadMessages(lookbackWindow: $lookbackWindow)
}
""",
"variables": {"lookbackWindow": lookback_window},
},
headers={"Authorization": f"Bearer {access_token}"},
trust_env=False,
timeout=20,
)
return response.json()["data"]["aiReadMessages"]
except Exception:
return False
def ai_read_chat_messages(access_token: str, space_id: str, chat_id: str) -> Union[dict, bool, None]:
try:
response = httpx.post(
BASE_URL,
json={
"query": """
query AiReadChatMessages($spaceId: BigInt!, $chatId: BigInt!) {
aiReadChatMessages(spaceId: $spaceId, chatId: $chatId)
}
""",
"variables": {"spaceId": space_id, "chatId": chat_id},
},
headers={"Authorization": f"Bearer {access_token}"},
trust_env=False,
timeout=20,
)
return response.json()["data"]["aiReadChatMessages"]
except Exception:
return False
Writing functions
def ai_create_message(access_token: str, space_id: str, chat_id: str, content: str) -> Union[bool, None]:
try:
response = httpx.post(
BASE_URL,
json={
"query": """
mutation AiCreateMessage($spaceId: BigInt!, $chatId: BigInt!, $content: String!) {
aiCreateMessage(spaceId: $spaceId, chatId: $chatId, content: $content)
}
""",
"variables": {"spaceId": space_id, "chatId": chat_id, "content": content},
},
headers={"Authorization": f"Bearer {access_token}"},
trust_env=False,
timeout=20,
)
return response.json()["data"]["aiCreateMessage"]
except Exception:
return False
def ai_create_chat_and_message(access_token: str, space_id: str, content: str) -> Union[bool, None]:
try:
response = httpx.post(
BASE_URL,
json={
"query": """
mutation AiCreateChatAndMessage($spaceId: BigInt!, $content: String!) {
aiCreateChatAndMessage(spaceId: $spaceId, content: $content)
}
""",
"variables": {"spaceId": space_id, "content": content},
},
headers={"Authorization": f"Bearer {access_token}"},
trust_env=False,
timeout=20,
)
return response.json()["data"]["aiCreateChatAndMessage"]
except Exception:
return False
Cleanup function
def ai_quit_spaces(access_token: str, space_ids: list[str]) -> Union[bool, None]:
try:
response = httpx.post(
BASE_URL,
json={
"query": """
mutation AiQuitSpaces($spaceIds: [BigInt!]!) {
aiQuitSpaces(spaceIds: $spaceIds)
}
""",
"variables": {"spaceIds": space_ids},
},
headers={"Authorization": f"Bearer {access_token}"},
trust_env=False,
timeout=20,
)
return response.json()["data"]["aiQuitSpaces"]
except Exception:
return False
Return value semantics
All five functions share the same return value contract:
| Return value | Meaning |
|---|---|
list[dict] / dict / True
|
The operation succeeded |
[] |
The read succeeded, but no relevant messages were found |
None |
The access token is missing or expired; the agent should re-authenticate |
False |
Something failed; notify the user and stop |
This contract is critical because the agent, not the server, owns the workflow. If the token is expired, the skill instructs the agent to run the sign-in process, obtain a new token, and retry. The server never attempts to redirect or refresh — it simply returns None and lets the agent decide.
The data model
The Lead Engagement functions touch five tables:
usersspaces-
members/copy_members chatsmessages
Here is the simplified relationship:
A few things to notice before we dive into the code.
First, members and copy_members are kept in sync by a PostgreSQL trigger. members is partitioned by user_id, which makes "find all spaces for a given user" fast. copy_members is partitioned by space_id, which makes "find all members in a given space" fast. The trigger copies every insert, update, and delete so that both query patterns hit the right partition.
Second, chats is partitioned by space_id and messages is partitioned by chat_id. This means fetching all messages in a chat stays within a single partition, and fetching all chats in a space does the same.
Third, spaces has an is_shadow flag. Shadow spaces are internal system constructs that should never appear in agent-facing results. Every query in the Lead Engagement pipeline filters them out. In QuestMeet's multi-node deployment, users, spaces, and chats must maintain consistent object counts across all regions to satisfy foreign-key constraints; an is_shadow = TRUE row holds no actual user, space, or chat data and exists purely to anchor partitioned records like messages that belong to a different node.
The GraphQL entry points
On the server side, the five public fields are implemented as Strawberry GraphQL resolvers. Here is the full signature block:
@strawberry.field
async def ai_read_messages(self, info: Info, lookback_window: Optional[int] = None) -> Optional[JSON]:
...
@strawberry.field
async def ai_read_chat_messages(self, info: Info, space_id: BigInt, chat_id: BigInt) -> Optional[JSON]:
...
@strawberry.mutation
async def ai_create_message(self, info: Info, space_id: BigInt, chat_id: BigInt, content: str) -> Optional[bool]:
...
@strawberry.mutation
async def ai_create_chat_and_message(self, info: Info, space_id: BigInt, content: str) -> Optional[bool]:
...
@strawberry.mutation
async def ai_quit_spaces(self, info: Info, space_ids: List[BigInt]) -> Optional[bool]:
...
The read functions are queries. The write functions are mutations. All of them share the same auth guard pattern and return None when unauthenticated. Let us walk through each one.
Step 1: Auth guard
Every function begins with the same check:
if user_id := info.context["user_id"]:
# ... actual logic ...
return ...
return None
In the QuestMeet GraphQL service, info.context["user_id"] is populated after the access token is verified by middleware. If it is missing, the function returns None.
The server does not attempt to redirect, refresh, or retry. It only tells the agent:
You are not authenticated for this operation.
The skill then instructs the agent to run the sign-in process again, store the new access token in long-term memory or a local .txt file, and retry the original process. This keeps the backend simple and makes the agent responsible for workflow recovery — the same pattern used in the Search and Contact module.
Step 2: Batch message ingestion (ai_read_messages)
This is the entry point for the entire Lead Engagement flow. The agent calls it with an optional lookback_window (in seconds) and receives a compact summary of all recent activity across all spaces.
The server-side implementation has three phases.
Phase 1: Collect space IDs
async with info.context["db_pool"].acquire() as conn:
space_ids = [row["space_id"] for row in await read_user_space_ids(conn, user_id)]
The helper function is straightforward:
async def read_user_space_ids(conn: asyncpg.Connection, user_id: int) -> list[dict]:
query = """
SELECT space_id
FROM members
WHERE user_id = $1
"""
rows = await conn.fetch(query, user_id)
return [dict(row) for row in rows]
Because members is partitioned by user_id, this query hits exactly one partition — the one containing the current user's memberships.
Phase 2: Read spaces and filter shadows
rows = await read_spaces(conn, space_ids) if space_ids else []
The helper reads space metadata:
async def read_spaces(conn: asyncpg.Connection, space_id_list: list[int]) -> list[dict]:
query = f"""
SELECT space_id, name, description, lock_space_name, lock_space_description,
pause_new_invitations, updated_at, is_shadow
FROM spaces
WHERE {' OR '.join(f'(space_id = ${i+1})' for i in range(len(space_id_list)))}
"""
rows = await conn.fetch(query, *space_id_list)
return [dict(row) for row in rows]
ai_read_messages consumes only space_id and is_shadow from the result set returned by read_spaces. The shadow filter is applied immediately:
for space in [space for space in rows if not space["is_shadow"]]:
Shadow spaces are excluded before any chat or message query runs. This is a cheap filter — it happens in Python over an already-small list of spaces.
Phase 3: Find active chats and collect messages
For each non-shadow space, the server finds chats that meet four conditions:
if lookback_window:
query = """
SELECT chat_id
FROM chats
WHERE space_id = $1 AND shared = TRUE AND is_shadow = FALSE
AND EXISTS (
SELECT 1
FROM messages
WHERE messages.space_id = chats.space_id
AND messages.chat_id = chats.chat_id
AND humans_only = FALSE
AND created_by != $2
AND created_at > NOW() - ($3 * INTERVAL '1 second')
)
"""
else:
query = """
SELECT chat_id
FROM chats
WHERE space_id = $1 AND shared = TRUE AND is_shadow = FALSE
AND EXISTS (
SELECT 1
FROM messages
WHERE messages.space_id = chats.space_id
AND messages.chat_id = chats.chat_id
AND humans_only = FALSE
AND created_by != $2
)
"""
Let us unpack the four conditions.
shared = TRUE: Only shared chats are visible to AI agents. Private chats between two humans are excluded. This is a design decision — the agent should only process conversations that are explicitly meant to be collaborative and discoverable.
is_shadow = FALSE: Shadow chats, like shadow spaces, are internal system artifacts. They never appear in agent results.
humans_only = FALSE: Messages marked humans_only = TRUE are explicitly flagged as human-only content. The agent must not read them. This check appears in the EXISTS subquery, meaning a chat is only included if it has at least one non-human-only message from someone else.
created_by != $2: The subquery excludes messages created by the current user. The agent is looking for inbound communication — what other people said to the user. Including the user's own messages would add noise without adding signal.
When lookback_window is provided, a fifth condition applies:
created_at > NOW() - ($3 * INTERVAL '1 second'): Only messages within the time window are considered. The SKILL.md recommends a default of 86400 seconds (24 hours), but the agent can adjust this based on the user's needs.
For each qualifying chat, the server collects messages:
results.append({
"space_id": space["space_id"],
"chats": [
{
"chat_id": chat["chat_id"],
"messages": await list_chat_messages(conn, chat["chat_id"], lookback_window),
}
for chat in rows
],
"members": await list_space_members(conn, space["space_id"], user_id),
})
The list_chat_messages helper applies the same humans_only and time-window filters:
async def list_chat_messages(conn: asyncpg.Connection, chat_id: int, lookback_window: Optional[int] = None) -> list[dict]:
if lookback_window:
query = """
SELECT messages.content, members.alias, messages.created_at
FROM messages
LEFT JOIN members ON messages.created_by = members.user_id
AND messages.space_id = members.space_id
WHERE messages.chat_id = $1
AND messages.humans_only = FALSE
AND messages.created_at > NOW() - ($2 * INTERVAL '1 second')
ORDER BY messages.created_at
"""
rows = await conn.fetch(query, chat_id, lookback_window)
else:
query = """
SELECT messages.content, members.alias, messages.created_at
FROM messages
LEFT JOIN members ON messages.created_by = members.user_id
AND messages.space_id = members.space_id
WHERE messages.chat_id = $1
AND messages.humans_only = FALSE
ORDER BY messages.created_at
"""
rows = await conn.fetch(query, chat_id)
messages = [dict(row) for row in rows]
for message in messages:
message["created_at"] = message["created_at"].isoformat()
return messages
Each message includes three fields: content, the sender's alias, and created_at (serialized to ISO 8601). The alias comes from the members table — it is the per-space display name, not the global username. This gives the agent context-specific identity information.
Why two layers?
You might wonder: why not return all messages from all chats in one query?
The answer is bandwidth and token efficiency. A user might belong to 20 spaces, each with 5 active chats, each with 50 messages. That is 5,000 messages. An AI agent processing that much text would burn through its context window before reaching any decisions.
By returning only messages within the lookback_window, the agent gets a manageable snapshot. If the snapshot is insufficient to evaluate a particular chat — for example, the recent messages are all "Thanks" and "OK" while the real substance is older — the agent can call ai_read_chat_messages for a deep dive.
This two-layer design is not an optimization. It is a necessity for agent-driven workflows.
Step 3: Deep-dive chat retrieval (ai_read_chat_messages)
When the agent needs full context for a specific chat, it calls:
@strawberry.field
async def ai_read_chat_messages(self, info: Info, space_id: BigInt, chat_id: BigInt) -> Optional[JSON]:
if user_id := info.context["user_id"]:
try:
if await is_member(info, space_id, user_id) and await is_shared_chat(info, space_id, chat_id):
async with info.context["db_pool"].acquire() as conn:
return {
"space_id": space_id,
"chat": {
"chat_id": chat_id,
"messages": await list_chat_messages(conn, chat_id),
},
"members": await list_space_members(conn, space_id, user_id),
}
return False
except Exception:
return False
return None
This function has two explicit permission checks before any data is returned.
is_member
async def is_member(info: Info, space_id: int, user_id: int) -> bool:
query = """
SELECT EXISTS(
SELECT 1
FROM copy_members
WHERE space_id = $1 AND user_id = $2
)
"""
async with info.context["db_pool"].acquire() as conn:
return await conn.fetchval(query, space_id, user_id)
The agent can only read messages from spaces the user actually belongs to. This check uses copy_members because the query is space-scoped — "is this user in this space?" — which aligns with copy_members's space_id-based partitioning.
is_shared_chat
async def is_shared_chat(info: Info, space_id: int, chat_id: int) -> bool:
query = """
SELECT EXISTS(
SELECT 1
FROM chats
WHERE space_id = $1 AND chat_id = $2 AND shared = TRUE
)
"""
async with info.context["db_pool"].acquire() as conn:
return await conn.fetchval(query, space_id, chat_id)
Even if the user is a member of the space, they can only read shared chats through the agent API. Private chats are excluded.
If either check fails, the function returns False — not None. None means "re-authenticate". False means "you do not have permission, and retrying will not help". The agent should notify the user and stop.
When both checks pass, the function calls list_chat_messages without a lookback_window, returning the full message history. The response structure mirrors ai_read_messages: space_id, chat with messages, and members.
Step 4: Member enumeration with privacy (list_space_members)
Both ai_read_messages and ai_read_chat_messages include a members field in their response. This field is populated by:
async def list_space_members(conn: asyncpg.Connection, space_id: int, user_id: int) -> list[dict]:
query = """
SELECT user_id, alias, avatar, description
FROM copy_members
WHERE space_id = $1
"""
rows = await conn.fetch(query, space_id)
return [
{"alias": f"{row['alias']} (your user)"}
if row["user_id"] == user_id
else {"alias": row["alias"], "avatar": row["avatar"], "description": row["description"]}
for row in rows
]
There is a subtle privacy design here.
For every member except the current user, the function returns alias, avatar, and description — the public-facing profile fields within that space.
For the current user, it returns only alias with the suffix (your user). The avatar and description are omitted.
Why? Because the agent already knows who its user is. Sending the user's own avatar and description back to the agent is redundant at best and, in some architectures, could leak the user's self-description into agent logs or third-party LLM contexts. By stripping these fields, the server applies a minimal disclosure principle: only return what the agent actually needs to distinguish other members.
The query targets copy_members rather than members because it is space-scoped — "give me all members of this space" — which aligns with copy_members's space_id-based partitioning.
Step 5: Server-enforced rate limiting (within_rate_limit)
Before the agent can write a message, the server checks a rate limit:
async def within_rate_limit(info: Info, space_id: int, user_id: int) -> bool:
query = """
SELECT COALESCE((
SELECT NOT (
last_message.created_by = $2
AND last_message.created_at > NOW() - INTERVAL '600 seconds'
)
FROM chats
JOIN LATERAL (
SELECT created_by, created_at
FROM messages
WHERE messages.space_id = chats.space_id
AND messages.chat_id = chats.chat_id
ORDER BY created_at DESC
LIMIT 1
) AS last_message ON TRUE
WHERE chats.space_id = $1 AND chats.shared = TRUE
ORDER BY last_message.created_at DESC
LIMIT 1
), TRUE)
"""
async with info.context["db_pool"].acquire() as conn:
return await conn.fetchval(query, space_id, user_id)
This query deserves a close reading.
The LATERAL JOIN pattern
For each shared chat in the space, the lateral subquery fetches the single most recent message:
SELECT created_by, created_at
FROM messages
WHERE messages.space_id = chats.space_id
AND messages.chat_id = chats.chat_id
ORDER BY created_at DESC
LIMIT 1
The LATERAL keyword allows this subquery to reference chats.space_id and chats.chat_id from the outer query. Without it, the subquery would be independent and could not correlate per chat.
Finding the most recent message across all chats
The outer query then takes these per-chat latest messages and finds the single most recent one across the entire space:
ORDER BY last_message.created_at DESC
LIMIT 1
The rate limit check
The SELECT NOT (...) expression returns FALSE if the most recent message anywhere in that space — across all its shared chats — was both created by the current user and created within the last 600 seconds (10 minutes). Otherwise, it returns TRUE.
The COALESCE fallback
If the space has no shared chats or no messages at all, the subquery returns NULL. COALESCE(..., TRUE) ensures the rate limit check passes in that case — an empty space should not block the first message.
Why enforce this on the server?
The agent is instructed by SKILL.md to batch its replies and send them in parallel. But instructions are not guarantees. A buggy agent script, a misconfigured recurring task, or an adversarial prompt injection could cause the agent to flood a space with messages.
By enforcing the 600-second cooldown in the GraphQL resolver — before any write touches the database — the server guarantees that no agent can post a second consecutive message in the same space within 10 minutes of its last one, though it may reply immediately once another participant has responded. This is defense in depth: the agent is trusted to behave well, but the server does not rely on that trust.
Step 6: Reply routing — two write paths
Once the agent has evaluated all conversations and the user has confirmed the action plan, the agent writes replies. There are two paths.
Path A: Direct reply (ai_create_message)
@strawberry.mutation
async def ai_create_message(self, info: Info, space_id: BigInt, chat_id: BigInt, content: str) -> Optional[bool]:
if user_id := info.context["user_id"]:
try:
if await is_member(info, space_id, user_id) \
and await is_shared_chat(info, space_id, chat_id) \
and await within_rate_limit(info, space_id, user_id):
if await push_create_message(info, chat_id, space_id, content,
None, None, False, user_id, None):
return True
return False
except Exception:
return False
return None
Three guards before the write:
-
is_member: The user must belong to the space. -
is_shared_chat: The target chat must be shared. -
within_rate_limit: The 600-second cooldown must have elapsed.
If all three pass, the function calls push_create_message, which internally calls:
async def create_message(conn: asyncpg.Connection, chat_id: int, space_id: int,
content: str, reasoning: Optional[str], images: Optional[list[str]],
humans_only: bool, created_by: Optional[int], answered_by: Optional[str],
cum_input_tokens: int, input_cost: Optional[float],
cum_output_tokens: int, output_cost: Optional[float]) -> dict:
query = """
INSERT INTO messages (chat_id, space_id, content, reasoning, images,
humans_only, created_by, answered_by,
cum_input_tokens, input_cost, cum_output_tokens, output_cost)
VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11, $12)
RETURNING *
"""
row = await conn.fetchrow(query, chat_id, space_id, content, reasoning, images,
humans_only, created_by, answered_by,
cum_input_tokens, input_cost, cum_output_tokens, output_cost)
return dict(row)
For agent-created messages, humans_only is always False, reasoning and images are None, and token accounting fields are zero. The message is inserted and the full row is returned.
Path B: Context compression (ai_create_chat_and_message)
@strawberry.mutation
async def ai_create_chat_and_message(self, info: Info, space_id: BigInt, content: str) -> Optional[bool]:
if user_id := info.context["user_id"]:
try:
if await is_member(info, space_id, user_id) \
and await within_rate_limit(info, space_id, user_id):
if chat := await push_create_chat(info, space_id, content[:20] + "...", True, user_id):
if await push_create_message(info, chat["chat_id"], space_id, content,
None, None, False, user_id, None):
return True
return False
except Exception:
return False
return None
This path is used when the current chat has 10 or more messages. Instead of appending to a long thread, the server:
- Creates a new chat with a name derived from the first 20 characters of the content.
- Creates the reply message in that new chat.
The internal create_chat function:
async def create_chat(conn: asyncpg.Connection, space_id: int, name: str,
shared: bool, created_by: int) -> dict:
query = """
INSERT INTO chats (space_id, name, shared, created_by)
VALUES ($1, $2, $3, $4)
RETURNING chat_id, space_id, name, shared, created_by, updated_at, is_shadow
"""
row = await conn.fetchrow(query, space_id, name, shared, created_by)
return dict(row)
The new chat is always shared = TRUE so that the other party's agent can also access it.
Why fork at 10 messages?
This threshold is not arbitrary. It is a context management strategy designed for AI agents.
When an agent reads a chat with 50 messages, it must process all 50 to understand the conversation. Many of those messages are noise: greetings, acknowledgments, off-topic tangents. The agent's context window fills up with low-signal text, leaving less room for reasoning about the actual opportunity.
By forking into a new chat, the agent is expected to include a compaction of the previous conversation in its reply. The SKILL.md explicitly instructs:
The reply must include a compaction of the messages in the current chat before the reply, ensuring that the key takeaways about the lead can be obtained without referencing other chats in the space.
This means the new chat starts with a clean slate, but the first message carries a compressed summary of everything that came before. The other party's agent can read that single message and understand the full context without crawling through the old thread.
It is a simple but effective pattern: the server provides the fork mechanism, and the agent provides the compression.
Step 7: Space exit and identity rotation (ai_quit_spaces)
The final function lets the agent clean up low-value spaces:
@strawberry.mutation
async def ai_quit_spaces(self, info: Info, space_ids: List[BigInt]) -> Optional[bool]:
if user_id := info.context["user_id"]:
try:
async with info.context["db_pool"].acquire() as conn:
async with conn.transaction():
await delete_members(conn, user_id, space_ids)
await update_candidate_id(conn, user_id)
return True
except Exception:
return False
return None
Two operations run inside a single database transaction, ensuring that deleting the memberships and rotating the candidate IDs succeed or fail together.
Delete memberships
async def delete_members(conn: asyncpg.Connection, user_id: int, space_id_list: list[int]) -> bool:
query = f"""
DELETE FROM members
WHERE {' OR '.join(f"(user_id = $1 AND space_id = ${i+2})" for i in range(len(space_id_list)))}
"""
await conn.execute(query, user_id, *space_id_list)
return True
The function accepts a list of space IDs and deletes all matching memberships in one query. The copy_members_trigger fires for each deleted row, keeping copy_members in sync.
Rotate candidate IDs
async def update_candidate_id(conn: asyncpg.Connection, user_id: int) -> bool:
query = """
UPDATE users
SET professional_id = $2, buyer_id = $3
WHERE user_id = $1
"""
await conn.execute(query, user_id, str(uuid.uuid4()), str(uuid.uuid4()))
return True
Both professional_id and buyer_id are regenerated as new UUIDs.
Why rotate IDs on exit?
This is a privacy primitive.
When a user contacts someone through the Search and Contact process, they share their candidate_id — either professional_id or buyer_id, depending on the perspective. The other party can use that ID to send follow-up messages.
If the user later decides a space is not worth keeping — for example, it turned out to be marketing spam — they want to sever the connection cleanly. Simply leaving the space is not enough, because the other party still has the old candidate_id and could use it to initiate contact again.
By rotating both IDs in the same transaction that deletes the memberships, the server ensures that any previously shared candidate IDs become invalid immediately. The other party cannot reconnect using the old ID. If the user wants to be contacted again in the future, they would need to share their new ID through a fresh interaction.
This is not a perfect anonymity guarantee — the other party could still recognize the user by name or conversation context. But it is a practical, low-cost barrier against automated re-contact, and it aligns with the principle that leaving a space should mean leaving it completely.
Relevant indexes and partitioning
Here are the indexes and partitioning choices most relevant to the Lead Engagement pipeline.
Messages
CREATE INDEX i_messages_chat_humans_only_created_at
ON messages (chat_id, humans_only, created_at);
CREATE INDEX i_messages_chat_created_at
ON messages (chat_id, created_at DESC);
Both indexes support list_chat_messages. The first covers the filtered case (humans_only = FALSE, time-windowed). The second covers the ORDER BY created_at DESC pattern used by within_rate_limit's lateral subquery.
messages is partitioned by chat_id, so all messages in a chat live in the same partition.
Chats
chats is partitioned by space_id. The ai_read_messages query filters by space_id and shared, so it hits exactly the relevant partitions.
Members and copy_members
members is partitioned by user_id. read_user_space_ids queries by user_id and hits one partition.
copy_members is partitioned by space_id. list_space_members and is_member query by space_id and hit one partition.
The trigger that keeps them in sync:
CREATE OR REPLACE FUNCTION copy_members()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO copy_members (space_id, user_id, alias, avatar, description,
invited_by, is_admin, created_at)
VALUES (NEW.space_id, NEW.user_id, NEW.alias, NEW.avatar, NEW.description,
NEW.invited_by, NEW.is_admin, NEW.created_at);
ELSIF TG_OP = 'UPDATE' THEN
UPDATE copy_members
SET alias = NEW.alias, avatar = NEW.avatar, description = NEW.description,
invited_by = NEW.invited_by, is_admin = NEW.is_admin
WHERE space_id = NEW.space_id AND user_id = NEW.user_id;
ELSIF TG_OP = 'DELETE' THEN
DELETE FROM copy_members
WHERE space_id = OLD.space_id AND user_id = OLD.user_id;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER copy_members_trigger
AFTER INSERT OR UPDATE OR DELETE ON members
FOR EACH ROW
EXECUTE FUNCTION copy_members();
This is a classic CQRS-lite pattern implemented entirely within PostgreSQL. members is the write source, optimized for user-scoped queries. copy_members is the read replica, optimized for space-scoped queries. The trigger guarantees consistency without application-level coordination.
The full mental model
Here is the complete Lead Engagement flow, step by step:
- The agent calls
ai_read_messageswith alookback_window(default: 86400 seconds). - The server collects the user's space IDs from
members. - Shadow spaces are filtered out.
- For each remaining space, the server finds shared, non-shadow chats with recent inbound messages (not
humans_only, not created by the user). - Messages within the lookback window are returned, along with space members (with the current user's details stripped).
- The agent evaluates each chat. If the recent messages are insufficient to decide, it calls
ai_read_chat_messagesfor a full history. - The agent classifies each chat: reply directly, compact and fork, do nothing, or quit.
- After user confirmation, the agent calls
ai_create_messagefor short chats andai_create_chat_and_messagefor long chats — all in parallel. - For spaces to quit, the agent calls
ai_quit_spaceswith a list of space IDs. - The server deletes the memberships and rotates
professional_idandbuyer_idin a single transaction. - The agent evaluates whether the user's decisions reveal new preferences, and if so, triggers the Impression Management process.
Engineering takeaways
Here are the main design lessons from building this.
1. Two-layer reads prevent context-window bloat
Returning all messages from all chats would overwhelm the agent. The lookback_window snapshot gives the agent enough to triage. The deep-dive function gives it everything when needed. This split is not an optimization — it is a requirement for agent-driven workflows.
2. Server-side rate limiting is non-negotiable
Agents can be buggy, misconfigured, or prompt-injected. A 600-second cooldown enforced at the GraphQL layer is a safety net that no amount of client-side instruction can replace. The LATERAL JOIN pattern makes the check efficient even across many chats.
3. Context compression via chat forking is an agent-native pattern
Long chat threads are hostile to AI agents. Forking at 10 messages, with the agent responsible for compressing the history into the first message of the new chat, keeps contexts clean without losing continuity. The server provides the mechanism; the agent provides the intelligence.
4. Identity rotation is a practical privacy primitive
Regenerating professional_id and buyer_id on space exit is a low-cost way to prevent automated re-contact. It does not provide anonymity, but it raises the friction enough to deter casual follow-ups from spaces the user has explicitly abandoned.
5. Partitioning should match query patterns
members is partitioned by user_id because the dominant query is "find all spaces for this user". copy_members is partitioned by space_id because the dominant query is "find all members in this space". The trigger-based mirroring is a small write cost for a large read benefit.
6. Auth failure and permission failure should be distinct
Returning None for expired tokens and False for permission denials lets the agent respond appropriately: re-authenticate vs. notify and stop. For agent workflows, these distinctions are not cosmetic — they determine whether the agent retries or moves on.
Closing
Opportunity Skill is built around a simple belief:
In the AI-agent era, your inbox should not just be readable. It should be triaged, prioritized, and acted upon by your agent — with safety and privacy guarantees enforced by the platform, not left to agent discretion.
The Lead Engagement pipeline described here turns a natural-language request into:
- a time-windowed snapshot of all inbound conversations
- on-demand deep-dive access to full chat histories
- privacy-aware member context
- server-enforced rate limiting
- two write paths optimized for agent context management
- atomic space exit with identity rotation
Together with the Search and Contact pipeline, it forms a complete opportunity-capture loop: one side finds the right people, the other side handles the people who find you. Both feed back into the Impression Management process, continuously refining the user's agent-searchable profile.
If you want to try the skill, you can ask your agent to install it from:
https://github.com/QuestMeet/opportunityskill/releases/download/latest/opportunity-skill.zip
Previous article in this series: Architecting a Two-Stage Semantic Search Pipeline with HNSW, LATERAL JOIN, and Cubic Scoring


Top comments (0)