Skip to content

Commit 8f9fda3

Browse files
committed
feat: add watch_directory api
1 parent ff975d6 commit 8f9fda3

10 files changed

Lines changed: 1618 additions & 5 deletions

File tree

agb/api/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ def close(self):
189189

190190
async def call_api_async_with_requests(url, method="GET", headers=None, params=None, data=None, json=None, timeout=30):
191191
"""
192-
使用 aiohttp 实现异步 HTTP 请求,模拟 requests 的用法。
192+
Implement async HTTP requests using aiohttp, mimicking requests usage.
193193
"""
194194
async with aiohttp.ClientSession() as session:
195195
req_method = getattr(session, method.lower())
@@ -202,7 +202,7 @@ async def call_api_async_with_requests(url, method="GET", headers=None, params=N
202202
timeout=timeout
203203
) as resp:
204204
resp_data = await resp.text()
205-
# 你可以根据需要返回 resp.json() resp.read()
205+
# You can return resp.json() or resp.read() as needed
206206
return {
207207
"status_code": resp.status,
208208
"headers": dict(resp.headers),

agb/modules/file_system.py

Lines changed: 298 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,55 @@
1-
from typing import Any, Dict, List, Optional, Union
1+
from typing import Any, Dict, List, Optional, Union, Callable
22
import json
3+
import time
4+
import threading
5+
from collections import defaultdict
36

47
from agb.api.base_service import BaseService
58
from agb.model.response import ApiResponse, BoolResult
69

710

11+
class FileChangeEvent:
12+
"""Represents a single file change event."""
13+
14+
def __init__(
15+
self,
16+
event_type: str = "",
17+
path: str = "",
18+
path_type: str = "",
19+
):
20+
"""
21+
Initialize a FileChangeEvent.
22+
23+
Args:
24+
event_type (str): Type of the file change event (e.g., "modify", "create", "delete").
25+
path (str): Path of the file or directory that changed.
26+
path_type (str): Type of the path ("file" or "directory").
27+
"""
28+
self.event_type = event_type
29+
self.path = path
30+
self.path_type = path_type
31+
32+
def __repr__(self):
33+
return f"FileChangeEvent(event_type='{self.event_type}', path='{self.path}', path_type='{self.path_type}')"
34+
35+
def to_dict(self) -> Dict[str, str]:
36+
"""Convert to dictionary representation."""
37+
return {
38+
"eventType": self.event_type,
39+
"path": self.path,
40+
"pathType": self.path_type,
41+
}
42+
43+
@classmethod
44+
def from_dict(cls, data: Dict[str, str]) -> "FileChangeEvent":
45+
"""Create FileChangeEvent from dictionary."""
46+
return cls(
47+
event_type=data.get("eventType", ""),
48+
path=data.get("path", ""),
49+
path_type=data.get("pathType", ""),
50+
)
51+
52+
853
class FileInfoResult(ApiResponse):
954
"""Result of file info operations."""
1055

@@ -147,6 +192,69 @@ def __init__(
147192
self.error_message = error_message
148193

149194

195+
class FileChangeResult(ApiResponse):
196+
"""Result of file change detection operations."""
197+
198+
def __init__(
199+
self,
200+
request_id: str = "",
201+
success: bool = False,
202+
events: Optional[List[FileChangeEvent]] = None,
203+
raw_data: str = "",
204+
error_message: str = "",
205+
):
206+
"""
207+
Initialize a FileChangeResult.
208+
209+
Args:
210+
request_id (str, optional): Unique identifier for the API request.
211+
Defaults to "".
212+
success (bool, optional): Whether the operation was successful.
213+
Defaults to False.
214+
events (List[FileChangeEvent], optional): List of file change events.
215+
Defaults to None.
216+
raw_data (str, optional): Raw response data for debugging. Defaults to "".
217+
error_message (str, optional): Error message if the operation failed.
218+
Defaults to "".
219+
"""
220+
super().__init__(request_id)
221+
self.success = success
222+
self.events = events or []
223+
self.raw_data = raw_data
224+
self.error_message = error_message
225+
226+
def has_changes(self) -> bool:
227+
"""Check if there are any file changes."""
228+
return len(self.events) > 0
229+
230+
def get_modified_files(self) -> List[str]:
231+
"""Get list of modified file paths."""
232+
return [
233+
event.path
234+
for event in self.events
235+
if event.event_type == "modify" and event.path_type == "file"
236+
]
237+
238+
def get_created_files(self) -> List[str]:
239+
"""Get list of created file paths."""
240+
return [
241+
event.path
242+
for event in self.events
243+
if event.event_type == "create" and event.path_type == "file"
244+
]
245+
246+
def get_deleted_files(self) -> List[str]:
247+
"""Get list of deleted file paths."""
248+
return [
249+
event.path
250+
for event in self.events
251+
if event.event_type == "delete" and event.path_type == "file"
252+
]
253+
254+
def __repr__(self):
255+
return f"FileChangeResult(success={self.success}, events_count={len(self.events)})"
256+
257+
150258
class FileSystem(BaseService):
151259
"""
152260
Handles file operations in the AGB cloud environment.
@@ -786,3 +894,192 @@ def search_files(
786894
error_message=f"Failed to search files: {e}",
787895
)
788896

897+
def _get_file_change(self, path: str) -> FileChangeResult:
898+
"""
899+
Get file change information for the specified directory path.
900+
901+
Args:
902+
path: The directory path to monitor for file changes.
903+
904+
Returns:
905+
FileChangeResult: Result object containing parsed file change events and
906+
error message if any.
907+
"""
908+
909+
def parse_file_change_data(raw_data: str) -> List[FileChangeEvent]:
910+
"""
911+
Parse the raw file change data into FileChangeEvent objects.
912+
913+
Args:
914+
raw_data (str): Raw JSON string containing file change events.
915+
916+
Returns:
917+
List[FileChangeEvent]: List of parsed file change events.
918+
"""
919+
events = []
920+
try:
921+
# Parse the JSON array
922+
change_data = json.loads(raw_data)
923+
if isinstance(change_data, list):
924+
for event_dict in change_data:
925+
if isinstance(event_dict, dict):
926+
event = FileChangeEvent.from_dict(event_dict)
927+
events.append(event)
928+
else:
929+
print(f"Warning: Expected list but got {type(change_data)}")
930+
except json.JSONDecodeError as e:
931+
print(f"Warning: Failed to parse JSON data: {e}")
932+
print(f"Raw data: {raw_data}")
933+
except Exception as e:
934+
print(f"Warning: Unexpected error parsing file change data: {e}")
935+
936+
return events
937+
938+
args = {"path": path}
939+
try:
940+
result = self._call_mcp_tool("get_file_change", args)
941+
try:
942+
print("Response body:")
943+
print(
944+
json.dumps(
945+
getattr(result, "body", result), ensure_ascii=False, indent=2
946+
)
947+
)
948+
except Exception:
949+
print(f"Response: {result}")
950+
951+
if result.success:
952+
# Parse the file change events
953+
events = parse_file_change_data(result.data)
954+
return FileChangeResult(
955+
request_id=result.request_id,
956+
success=True,
957+
events=events,
958+
raw_data=result.data,
959+
)
960+
else:
961+
return FileChangeResult(
962+
request_id=result.request_id,
963+
success=False,
964+
raw_data=getattr(result, 'data', ''),
965+
error_message=result.error_message or "Failed to get file change",
966+
)
967+
except Exception as e:
968+
return FileChangeResult(
969+
request_id="",
970+
success=False,
971+
error_message=f"Failed to get file change: {e}",
972+
)
973+
974+
def watch_directory(
975+
self,
976+
path: str,
977+
callback: Callable[[List[FileChangeEvent]], None],
978+
interval: float = 1.0,
979+
stop_event: Optional[threading.Event] = None,
980+
) -> threading.Thread:
981+
"""
982+
Watch a directory for file changes and call the callback function when changes occur.
983+
984+
Args:
985+
path: The directory path to monitor for file changes.
986+
callback: Callback function that will be called with a list of FileChangeEvent
987+
objects when changes are detected.
988+
interval: Polling interval in seconds. Defaults to 1.0.
989+
stop_event: Optional threading.Event to stop the monitoring. If not provided,
990+
a new Event will be created and returned via the thread object.
991+
992+
Returns:
993+
threading.Thread: The monitoring thread. Call thread.start() to begin monitoring.
994+
Use the thread's stop_event attribute to stop monitoring.
995+
"""
996+
997+
def _monitor_directory():
998+
"""Internal function to monitor directory changes."""
999+
last_events = [] # Track last batch of events for deduplication
1000+
1001+
print(f"Starting directory monitoring for: {path}")
1002+
print(f"Polling interval: {interval} seconds")
1003+
1004+
while not stop_event.is_set():
1005+
try:
1006+
# Get current file changes
1007+
result = self._get_file_change(path)
1008+
1009+
if result.success:
1010+
# Check if current events are different from last events
1011+
current_events = result.events
1012+
1013+
# Compare with last events to avoid duplicates
1014+
if self._events_different(current_events, last_events):
1015+
print(f"Detected {len(current_events)} file changes (different from last):")
1016+
for event in current_events:
1017+
print(f" - {event}")
1018+
1019+
try:
1020+
callback(current_events)
1021+
except Exception as e:
1022+
print(f"Error in callback function: {e}")
1023+
1024+
# Update last events
1025+
last_events = current_events[:]
1026+
else:
1027+
print(f"Received {len(current_events)} events, but they are identical to last batch - skipping")
1028+
1029+
else:
1030+
print(f"Error monitoring directory: {result.error_message}")
1031+
1032+
# Wait for the next poll
1033+
stop_event.wait(interval)
1034+
1035+
except Exception as e:
1036+
print(f"Unexpected error in directory monitoring: {e}")
1037+
stop_event.wait(interval)
1038+
1039+
print(f"Stopped monitoring directory: {path}")
1040+
1041+
# Create stop event if not provided
1042+
if stop_event is None:
1043+
stop_event = threading.Event()
1044+
1045+
# Create and configure the monitoring thread
1046+
monitor_thread = threading.Thread(
1047+
target=_monitor_directory,
1048+
name=f"DirectoryWatcher-{path.replace('/', '_')}",
1049+
daemon=True
1050+
)
1051+
1052+
# Add stop_event as an attribute to the thread for easy access
1053+
monitor_thread.stop_event = stop_event
1054+
1055+
return monitor_thread
1056+
1057+
def _events_different(self, current_events: List[FileChangeEvent], last_events: List[FileChangeEvent]) -> bool:
1058+
"""
1059+
Compare two lists of events to determine if they are different.
1060+
1061+
Args:
1062+
current_events: Current batch of events
1063+
last_events: Previous batch of events
1064+
1065+
Returns:
1066+
bool: True if events are different, False if they are the same
1067+
"""
1068+
# If lengths are different, events are different
1069+
if len(current_events) != len(last_events):
1070+
return True
1071+
1072+
# If both are empty, they are the same
1073+
if len(current_events) == 0:
1074+
return False
1075+
1076+
# Compare each event
1077+
for current_event, last_event in zip(current_events, last_events):
1078+
if (current_event.event_type != last_event.event_type or
1079+
current_event.path != last_event.path or
1080+
current_event.path_type != last_event.path_type):
1081+
return True
1082+
1083+
# All events are identical
1084+
return False
1085+

docs/api-reference/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ session.file_system.get_file_info(path: str) -> FileInfoResult
108108
session.file_system.move_file(source: str, destination: str) -> BoolResult
109109
session.file_system.edit_file(path: str, edits: List[dict]) -> BoolResult
110110
session.file_system.search_files(path: str, pattern: str) -> FileSearchResult
111+
session.file_system.watch_directory(path: str, callback: Callable, interval: float = 1.0, stop_event: Optional[threading.Event] = None) -> threading.Thread
111112
```
112113

113114
### OSS API

0 commit comments

Comments
 (0)