Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions ex_app/lib/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,25 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import json
import os
import string
import random
import string
from collections.abc import Awaitable, Callable
from datetime import date
from time import monotonic
from typing import Any, cast

from langchain_core.messages import ToolMessage, SystemMessage, AIMessage, HumanMessage, AIMessageChunk
from langchain_core.messages import AIMessage, AIMessageChunk, HumanMessage, SystemMessage, ToolMessage
from langchain_core.runnables import RunnableConfig
from nc_py_api import AsyncNextcloudApp
from nc_py_api.ex_app import persistent_storage

from ex_app.lib.signature import verify_signature
from ex_app.lib.signature import add_signature
from ex_app.lib.all_tools.skills import list_skills_metadata
from ex_app.lib.graph import AgentState, get_graph
from ex_app.lib.jsonplus import JsonPlusSerializer
from ex_app.lib.memorysaver import MemorySaver
from ex_app.lib.nc_model import model
from ex_app.lib.signature import add_signature, verify_signature
from ex_app.lib.tools import get_tools
from ex_app.lib.memorysaver import MemorySaver
from ex_app.lib.jsonplus import JsonPlusSerializer


# Dummy thread id as we return the whole state
thread = {"configurable": {"thread_id": "thread-1"}}
Expand Down Expand Up @@ -152,6 +151,28 @@ async def call_model(

if task['input'].get('memories', None) is not None and task['input'].get('memories', None) is not []:
system_prompt_text += "You can remember things from other conversations with the user. If relevant, take into account the following memories:\n\n" + "\n".join(task['input']['memories']) + "\n\n"
if tool_enabled("load_memory"):
Comment on lines 152 to +154
system_prompt_text += "In addition to the above memories, there are also long-term memories stored on-demand from other conversations. List and load those memories if they are not present here and the user or the conversation points to something that should be rememebered.\n"

if tool_enabled("load_skill"):
skills_metadata = await list_skills_metadata(nc)
if skills_metadata:
skill_lines = "\n".join(
f"- {s['name']}: {s['description']}" for s in skills_metadata
)
system_prompt_text += (
"You have access to the following skills. Each skill is a reusable, self-contained"
" procedure or guide stored by the user. If a skill is relevant to the user's request,"
" call the `load_skill` tool with its name to retrieve the full instructions before"
" acting on them. Do not mention skills to the user unless asked.\n\n"
"Available skills:\n" + skill_lines + "\n\n"
)
if tool_enabled("store_skill"):
system_prompt_text += (
"Only create a new skill with `store_skill` when the user explicitly asks you to,"
" or when they describe a clearly reusable procedure that should be remembered.\n"
)

# this is similar to customizing the create_react_agent with state_modifier, but is a lot more flexible
system_prompt = SystemMessage(
system_prompt_text.replace("{CURRENT_DATE}", current_date)
Expand Down
6 changes: 6 additions & 0 deletions ex_app/lib/all_tools/lib/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,11 @@ async def wrapper(*args): # needs NextcloudApp as first arg
timestamp[user_id] = current_time
return result

async def cache_invalidate(nc):
user_id = await nc.user
cached_result.pop(user_id, None)
timestamp.pop(user_id, None)

wrapper.cache_invalidate = cache_invalidate
return wrapper
return decorator
181 changes: 181 additions & 0 deletions ex_app/lib/all_tools/skills.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors
# SPDX-License-Identifier: AGPL-3.0-or-later

import logging
from urllib.parse import quote

from langchain_core.tools import tool
from nc_py_api import AsyncNextcloudApp
from nc_py_api._exceptions import NextcloudExceptionNotFound

from ex_app.lib.all_tools.lib.decorator import safe_tool, timed_memoize
from ex_app.lib.logger import log

# Skills follow the agentskills.io spec: each skill is a folder under
# "<Assistant folder>/Context Agent/Skills/<skillName>/" with a SKILL.md file
# containing YAML frontmatter (name, description) plus a markdown body.
# The Assistant app exposes OCS endpoints to list/load/store skills.

LIST_STORE_SKILLS_URL = '/ocs/v2.php/apps/assistant/api/v1/skills'
LOAD_SKILL_URL = '/ocs/v2.php/apps/assistant/api/v1/skills/{skillName}'
MAX_SKILL_NAME_LENGTH = 64
MAX_DESCRIPTION_LENGTH = 1024
MAX_CONTENT_LENGTH = 50_000

# skills are cached in the Assistant side for 24 hours backed by etags
# this cache interval is only a delay in cases where the skill is manually
# updated or added in the filesystem
SKILLS_CACHE_TTL = 5 * 60
Comment on lines +25 to +28


class AgentFacingError(Exception):
"""This exception's message would be returned to the agent to understand its mistake"""
...


def __validate_skill_name(skill_name: str) -> str:
if not skill_name or not skill_name.strip():
raise AgentFacingError('Skill name cannot be empty')
skill_name = skill_name.strip()

if len(skill_name) > MAX_SKILL_NAME_LENGTH:
raise AgentFacingError(
f'Skill name exceeds {MAX_SKILL_NAME_LENGTH} character limit '
f'({len(skill_name)} chars)'
)
return skill_name


@timed_memoize(SKILLS_CACHE_TTL)
async def list_skills_metadata(nc: AsyncNextcloudApp) -> list[dict[str, str]]:
"""
Fetch each skill's name and description from the assistant app.
Returns an empty list if the endpoint is unavailable or no skills exist.
"""
try:
res = await nc.ocs('GET', LIST_STORE_SKILLS_URL)
except NextcloudExceptionNotFound:
return []
except Exception as e:
return []

skills = res.get('skills') if isinstance(res, dict) else None
if not isinstance(skills, list):
return []
return [
{'name': str(s.get('name', '')), 'description': str(s.get('description', ''))}
for s in skills
if isinstance(s, dict) and s.get('name')
]


async def get_tools(nc: AsyncNextcloudApp):
@tool
@safe_tool
async def load_skill(skill_name: str):
"""
Load the full content of a skill (frontmatter + markdown body) by name.
Use this when one of the skills listed in the system prompt is relevant to
the user's request and you need its detailed instructions/procedure.
:param skill_name: The name of the skill (matches the `name` from the system prompt)
:return: The full SKILL.md content as a string.
"""
try:
skill_name = __validate_skill_name(skill_name)
except AgentFacingError as e:
return {'error': str(e)}

try:
res = await nc.ocs('GET', LOAD_SKILL_URL.format(skillName=quote(skill_name, safe='')))
except NextcloudExceptionNotFound:
return {'error': f'Skill "{skill_name}" not found'}
except Exception as e:
await log(
nc,
logging.WARNING,
f"Failed to load skill {skill_name} from user: {await nc.user}'s skills folder, exc: {e}",
)
return {'error': f'Failed to load the skill "{skill_name}"'}

if not isinstance(res, dict) or 'content' not in res:
return {'error': 'Malformed skill payload returned by the server'}
return res['content']

@tool
async def store_skill(skill_name: str, description: str, content: str):
Comment on lines +104 to +105
"""
Create or overwrite a skill. A skill is a reusable, self-contained markdown
document teaching the agent how to perform a particular task (procedure,
checklist, style guide, etc.). Only create a skill when the user explicitly
asks for one, or when the procedure is clearly reusable across future
conversations.

The `description` is shown to the agent in every future system prompt, so it
should be short and start with "Use this skill when ...".

The `content` is the markdown body of the skill (frontmatter is added by the
server automatically from `skill_name` and `description`).

:param skill_name: A short, human-readable name (max 64 chars)
:param description: One- or two-sentence description of when this skill applies (max 1024 chars)
:param content: The markdown body of the skill (max 50,000 chars)
:return: Status of the operation.
"""
try:
skill_name = __validate_skill_name(skill_name)
except AgentFacingError as e:
return {'error': str(e)}

if not description or not description.strip():
return {'error': 'Skill description cannot be empty'}
if len(description) > MAX_DESCRIPTION_LENGTH:
return {
'error': f'Skill description exceeds {MAX_DESCRIPTION_LENGTH} character limit '
f'({len(description)} chars)',
}
if not content or not content.strip():
return {'error': 'Skill content cannot be empty'}
if len(content) > MAX_CONTENT_LENGTH:
return {
'error': f'Skill content exceeds {MAX_CONTENT_LENGTH} character limit '
f'({len(content)} chars)',
}

try:
res = await nc.ocs(
'POST',
LIST_STORE_SKILLS_URL,
json={
'skillName': skill_name,
'description': description,
'content': content,
},
response_type='json',
)
except Exception as e:
await log(
nc,
logging.WARNING,
f"Failed to store skill {skill_name} in user: {await nc.user}'s skills folder, exc: {e}",
)
return {'error': f'Failed to store the skill {skill_name}'}

# invalidate the cached skills list for this user so the next system
# prompt reflects the new/updated skill
await list_skills_metadata.cache_invalidate(nc)

action = res.get('action') if isinstance(res, dict) else None
return {'status': 'success', 'action': action or 'stored', 'skill_name': skill_name}

return [load_skill, store_skill]


def get_category_name():
return 'Skills'


async def is_available(nc: AsyncNextcloudApp):
# Available as long as the assistant app exposes the skills endpoint.
# We don't probe here to avoid an extra request on every agent invocation;
# the tools themselves gracefully report errors if the endpoint is missing.
return True
17 changes: 12 additions & 5 deletions ex_app/lib/tools.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
import importlib
import json
import os
import pathlib
import json
from os.path import dirname

from nc_py_api import AsyncNextcloudApp

from ex_app.lib.all_tools.lib.decorator import timed_memoize


@timed_memoize(1*60)
async def get_tools(nc: AsyncNextcloudApp):
directory = dirname(__file__) + '/all_tools'
Expand Down Expand Up @@ -78,10 +80,15 @@ def get_categories():

def get_tool_module(file, directory):
module_name = pathlib.Path(file).stem # Extract module name without .py
module_path = os.path.join(directory, file)

spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Resolve via the canonical dotted package name so the module is shared
# with the rest of the codebase via sys.modules. This means module-level
# state (caches, singletons) survives across `get_tools` calls and a
# `from ex_app.lib.all_tools.<name> import ...` elsewhere refers to the
# SAME module object as the one we load here. Trade-off: hot-reloading
# tool files during development requires a process restart, which is fine.
qualified = f'ex_app.lib.all_tools.{module_name}'
module = importlib.import_module(qualified)
spec = module.__spec__

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one!

Comment on lines +88 to +92

return module_name, spec, module
Loading