diff --git a/ex_app/lib/agent.py b/ex_app/lib/agent.py index a1183b2..0d65ad7 100644 --- a/ex_app/lib/agent.py +++ b/ex_app/lib/agent.py @@ -2,26 +2,25 @@ # SPDX-License-Identifier: AGPL-3.0-or-later import json import os -import string import random +import string from collections.abc import Awaitable, Callable from datetime import date from time import monotonic from typing import Any, cast -from langchain_core.messages import ToolMessage, SystemMessage, AIMessage, HumanMessage, AIMessageChunk +from langchain_core.messages import AIMessage, AIMessageChunk, HumanMessage, SystemMessage, ToolMessage from langchain_core.runnables import RunnableConfig from nc_py_api import AsyncNextcloudApp from nc_py_api.ex_app import persistent_storage -from ex_app.lib.signature import verify_signature -from ex_app.lib.signature import add_signature +from ex_app.lib.all_tools.skills import list_skills_metadata from ex_app.lib.graph import AgentState, get_graph +from ex_app.lib.jsonplus import JsonPlusSerializer +from ex_app.lib.memorysaver import MemorySaver from ex_app.lib.nc_model import model +from ex_app.lib.signature import add_signature, verify_signature from ex_app.lib.tools import get_tools -from ex_app.lib.memorysaver import MemorySaver -from ex_app.lib.jsonplus import JsonPlusSerializer - # Dummy thread id as we return the whole state thread = {"configurable": {"thread_id": "thread-1"}} @@ -152,6 +151,28 @@ async def call_model( if task['input'].get('memories', None) is not None and task['input'].get('memories', None) is not []: system_prompt_text += "You can remember things from other conversations with the user. If relevant, take into account the following memories:\n\n" + "\n".join(task['input']['memories']) + "\n\n" + if tool_enabled("load_memory"): + system_prompt_text += "In addition to the above memories, there are also long-term memories stored on-demand from other conversations. List and load those memories if they are not present here and the user or the conversation points to something that should be rememebered.\n" + + if tool_enabled("load_skill"): + skills_metadata = await list_skills_metadata(nc) + if skills_metadata: + skill_lines = "\n".join( + f"- {s['name']}: {s['description']}" for s in skills_metadata + ) + system_prompt_text += ( + "You have access to the following skills. Each skill is a reusable, self-contained" + " procedure or guide stored by the user. If a skill is relevant to the user's request," + " call the `load_skill` tool with its name to retrieve the full instructions before" + " acting on them. Do not mention skills to the user unless asked.\n\n" + "Available skills:\n" + skill_lines + "\n\n" + ) + if tool_enabled("store_skill"): + system_prompt_text += ( + "Only create a new skill with `store_skill` when the user explicitly asks you to," + " or when they describe a clearly reusable procedure that should be remembered.\n" + ) + # this is similar to customizing the create_react_agent with state_modifier, but is a lot more flexible system_prompt = SystemMessage( system_prompt_text.replace("{CURRENT_DATE}", current_date) diff --git a/ex_app/lib/all_tools/lib/decorator.py b/ex_app/lib/all_tools/lib/decorator.py index 31dbf1b..9084cd3 100644 --- a/ex_app/lib/all_tools/lib/decorator.py +++ b/ex_app/lib/all_tools/lib/decorator.py @@ -38,5 +38,11 @@ async def wrapper(*args): # needs NextcloudApp as first arg timestamp[user_id] = current_time return result + async def cache_invalidate(nc): + user_id = await nc.user + cached_result.pop(user_id, None) + timestamp.pop(user_id, None) + + wrapper.cache_invalidate = cache_invalidate return wrapper return decorator diff --git a/ex_app/lib/all_tools/skills.py b/ex_app/lib/all_tools/skills.py new file mode 100644 index 0000000..5675f0d --- /dev/null +++ b/ex_app/lib/all_tools/skills.py @@ -0,0 +1,181 @@ +# SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors +# SPDX-License-Identifier: AGPL-3.0-or-later + +import logging +from urllib.parse import quote + +from langchain_core.tools import tool +from nc_py_api import AsyncNextcloudApp +from nc_py_api._exceptions import NextcloudExceptionNotFound + +from ex_app.lib.all_tools.lib.decorator import safe_tool, timed_memoize +from ex_app.lib.logger import log + +# Skills follow the agentskills.io spec: each skill is a folder under +# "/Context Agent/Skills//" with a SKILL.md file +# containing YAML frontmatter (name, description) plus a markdown body. +# The Assistant app exposes OCS endpoints to list/load/store skills. + +LIST_STORE_SKILLS_URL = '/ocs/v2.php/apps/assistant/api/v1/skills' +LOAD_SKILL_URL = '/ocs/v2.php/apps/assistant/api/v1/skills/{skillName}' +MAX_SKILL_NAME_LENGTH = 64 +MAX_DESCRIPTION_LENGTH = 1024 +MAX_CONTENT_LENGTH = 50_000 + +# skills are cached in the Assistant side for 24 hours backed by etags +# this cache interval is only a delay in cases where the skill is manually +# updated or added in the filesystem +SKILLS_CACHE_TTL = 5 * 60 + + +class AgentFacingError(Exception): + """This exception's message would be returned to the agent to understand its mistake""" + ... + + +def __validate_skill_name(skill_name: str) -> str: + if not skill_name or not skill_name.strip(): + raise AgentFacingError('Skill name cannot be empty') + skill_name = skill_name.strip() + + if len(skill_name) > MAX_SKILL_NAME_LENGTH: + raise AgentFacingError( + f'Skill name exceeds {MAX_SKILL_NAME_LENGTH} character limit ' + f'({len(skill_name)} chars)' + ) + return skill_name + + +@timed_memoize(SKILLS_CACHE_TTL) +async def list_skills_metadata(nc: AsyncNextcloudApp) -> list[dict[str, str]]: + """ + Fetch each skill's name and description from the assistant app. + Returns an empty list if the endpoint is unavailable or no skills exist. + """ + try: + res = await nc.ocs('GET', LIST_STORE_SKILLS_URL) + except NextcloudExceptionNotFound: + return [] + except Exception as e: + return [] + + skills = res.get('skills') if isinstance(res, dict) else None + if not isinstance(skills, list): + return [] + return [ + {'name': str(s.get('name', '')), 'description': str(s.get('description', ''))} + for s in skills + if isinstance(s, dict) and s.get('name') + ] + + +async def get_tools(nc: AsyncNextcloudApp): + @tool + @safe_tool + async def load_skill(skill_name: str): + """ + Load the full content of a skill (frontmatter + markdown body) by name. + Use this when one of the skills listed in the system prompt is relevant to + the user's request and you need its detailed instructions/procedure. + :param skill_name: The name of the skill (matches the `name` from the system prompt) + :return: The full SKILL.md content as a string. + """ + try: + skill_name = __validate_skill_name(skill_name) + except AgentFacingError as e: + return {'error': str(e)} + + try: + res = await nc.ocs('GET', LOAD_SKILL_URL.format(skillName=quote(skill_name, safe=''))) + except NextcloudExceptionNotFound: + return {'error': f'Skill "{skill_name}" not found'} + except Exception as e: + await log( + nc, + logging.WARNING, + f"Failed to load skill {skill_name} from user: {await nc.user}'s skills folder, exc: {e}", + ) + return {'error': f'Failed to load the skill "{skill_name}"'} + + if not isinstance(res, dict) or 'content' not in res: + return {'error': 'Malformed skill payload returned by the server'} + return res['content'] + + @tool + async def store_skill(skill_name: str, description: str, content: str): + """ + Create or overwrite a skill. A skill is a reusable, self-contained markdown + document teaching the agent how to perform a particular task (procedure, + checklist, style guide, etc.). Only create a skill when the user explicitly + asks for one, or when the procedure is clearly reusable across future + conversations. + + The `description` is shown to the agent in every future system prompt, so it + should be short and start with "Use this skill when ...". + + The `content` is the markdown body of the skill (frontmatter is added by the + server automatically from `skill_name` and `description`). + + :param skill_name: A short, human-readable name (max 64 chars) + :param description: One- or two-sentence description of when this skill applies (max 1024 chars) + :param content: The markdown body of the skill (max 50,000 chars) + :return: Status of the operation. + """ + try: + skill_name = __validate_skill_name(skill_name) + except AgentFacingError as e: + return {'error': str(e)} + + if not description or not description.strip(): + return {'error': 'Skill description cannot be empty'} + if len(description) > MAX_DESCRIPTION_LENGTH: + return { + 'error': f'Skill description exceeds {MAX_DESCRIPTION_LENGTH} character limit ' + f'({len(description)} chars)', + } + if not content or not content.strip(): + return {'error': 'Skill content cannot be empty'} + if len(content) > MAX_CONTENT_LENGTH: + return { + 'error': f'Skill content exceeds {MAX_CONTENT_LENGTH} character limit ' + f'({len(content)} chars)', + } + + try: + res = await nc.ocs( + 'POST', + LIST_STORE_SKILLS_URL, + json={ + 'skillName': skill_name, + 'description': description, + 'content': content, + }, + response_type='json', + ) + except Exception as e: + await log( + nc, + logging.WARNING, + f"Failed to store skill {skill_name} in user: {await nc.user}'s skills folder, exc: {e}", + ) + return {'error': f'Failed to store the skill {skill_name}'} + + # invalidate the cached skills list for this user so the next system + # prompt reflects the new/updated skill + await list_skills_metadata.cache_invalidate(nc) + + action = res.get('action') if isinstance(res, dict) else None + return {'status': 'success', 'action': action or 'stored', 'skill_name': skill_name} + + return [load_skill, store_skill] + + +def get_category_name(): + return 'Skills' + + +async def is_available(nc: AsyncNextcloudApp): + # Available as long as the assistant app exposes the skills endpoint. + # We don't probe here to avoid an extra request on every agent invocation; + # the tools themselves gracefully report errors if the endpoint is missing. + return True diff --git a/ex_app/lib/tools.py b/ex_app/lib/tools.py index 752b941..5755ff2 100644 --- a/ex_app/lib/tools.py +++ b/ex_app/lib/tools.py @@ -1,14 +1,16 @@ # SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors # SPDX-License-Identifier: AGPL-3.0-or-later import importlib +import json import os import pathlib -import json from os.path import dirname from nc_py_api import AsyncNextcloudApp + from ex_app.lib.all_tools.lib.decorator import timed_memoize + @timed_memoize(1*60) async def get_tools(nc: AsyncNextcloudApp): directory = dirname(__file__) + '/all_tools' @@ -78,10 +80,15 @@ def get_categories(): def get_tool_module(file, directory): module_name = pathlib.Path(file).stem # Extract module name without .py - module_path = os.path.join(directory, file) - spec = importlib.util.spec_from_file_location(module_name, module_path) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) + # Resolve via the canonical dotted package name so the module is shared + # with the rest of the codebase via sys.modules. This means module-level + # state (caches, singletons) survives across `get_tools` calls and a + # `from ex_app.lib.all_tools. import ...` elsewhere refers to the + # SAME module object as the one we load here. Trade-off: hot-reloading + # tool files during development requires a process restart, which is fine. + qualified = f'ex_app.lib.all_tools.{module_name}' + module = importlib.import_module(qualified) + spec = module.__spec__ return module_name, spec, module \ No newline at end of file