|
3 | 3 | from datetime import datetime, timezone |
4 | 4 | from dataclasses import dataclass, field |
5 | 5 |
|
6 | | -from mcp.server.fastmcp import FastMCP |
| 6 | +from mcp.server.fastmcp import FastMCP, Image |
| 7 | +from mcp.types import ImageContent, TextContent |
7 | 8 | from pydantic import BaseModel, Field |
8 | 9 |
|
9 | 10 | from scrapling.core.shell import Convertor |
|
31 | 32 | ) |
32 | 33 |
|
33 | 34 | SessionType = Literal["dynamic", "stealthy"] |
| 35 | +ScreenshotType = Literal["png", "jpeg"] |
34 | 36 |
|
35 | 37 |
|
36 | 38 | class ResponseModel(BaseModel): |
@@ -106,14 +108,14 @@ class ScraplingMCPServer: |
106 | 108 | def __init__(self): |
107 | 109 | self._sessions: Dict[str, _SessionEntry] = {} |
108 | 110 |
|
109 | | - def _get_session(self, session_id: str, expected_type: SessionType) -> _SessionEntry: |
110 | | - """Look up a session by ID and validate its type.""" |
| 111 | + def _get_session(self, session_id: str, expected_type: Optional[SessionType]) -> _SessionEntry: |
| 112 | + """Look up a session by ID, optionally validating its type. Pass `None` to skip the type check.""" |
111 | 113 | entry = self._sessions.get(session_id) |
112 | 114 | if entry is None: |
113 | 115 | raise ValueError(f"Session '{session_id}' not found. Use list_sessions to see active sessions.") |
114 | 116 | if not entry.session._is_alive: |
115 | 117 | raise ValueError(f"Session '{session_id}' is no longer alive. Open a new session.") |
116 | | - if entry.session_type != expected_type: |
| 118 | + if expected_type is not None and entry.session_type != expected_type: |
117 | 119 | raise ValueError( |
118 | 120 | f"Session '{session_id}' is a '{entry.session_type}' session, but this tool requires a " |
119 | 121 | f"'{expected_type}' session. Use the matching fetch tool for your session type." |
@@ -260,6 +262,69 @@ async def list_sessions(self) -> List[SessionInfo]: |
260 | 262 | for sid, entry in self._sessions.items() |
261 | 263 | ] |
262 | 264 |
|
| 265 | + async def screenshot( |
| 266 | + self, |
| 267 | + url: str, |
| 268 | + session_id: str, |
| 269 | + image_type: ScreenshotType = "png", |
| 270 | + full_page: bool = False, |
| 271 | + quality: Optional[int] = None, |
| 272 | + wait: int | float = 0, |
| 273 | + wait_selector: Optional[str] = None, |
| 274 | + wait_selector_state: SelectorWaitStates = "attached", |
| 275 | + network_idle: bool = False, |
| 276 | + timeout: int | float = 30000, |
| 277 | + ) -> List[ImageContent | TextContent]: |
| 278 | + """Capture a screenshot of a web page using an existing browser session and return it as an image. |
| 279 | + A browser session must be opened first with `open_session` (either `dynamic` or `stealthy`); the session ID is then passed here. |
| 280 | +
|
| 281 | + :param url: The URL to navigate to and capture. |
| 282 | + :param session_id: ID of an open browser session created with `open_session`. |
| 283 | + :param image_type: Image format. Defaults to "png". Use "jpeg" for smaller file sizes. |
| 284 | + :param full_page: When True, captures the full scrollable page instead of just the viewport. Defaults to False. |
| 285 | + :param quality: Image quality (0-100) for JPEG only. Raises if passed with `image_type="png"`. |
| 286 | + :param wait: Time in milliseconds to wait after page load before capturing. Defaults to 0. |
| 287 | + :param wait_selector: Optional CSS selector to wait for before capturing. |
| 288 | + :param wait_selector_state: State to wait for the selector. Defaults to "attached". |
| 289 | + :param network_idle: Wait for the page until there are no network connections for at least 500 ms. |
| 290 | + :param timeout: Timeout in milliseconds for page operations. Defaults to 30,000. |
| 291 | + """ |
| 292 | + if quality is not None and image_type != "jpeg": |
| 293 | + raise ValueError("'quality' is only valid when 'image_type' is 'jpeg'.") |
| 294 | + |
| 295 | + entry = self._get_session(session_id, expected_type=None) |
| 296 | + |
| 297 | + screenshot_kwargs: Dict[str, Any] = {"type": image_type, "full_page": full_page} |
| 298 | + if quality is not None: |
| 299 | + screenshot_kwargs["quality"] = quality |
| 300 | + |
| 301 | + captured: Dict[str, Any] = {} |
| 302 | + |
| 303 | + async def _capture(page: Any) -> None: |
| 304 | + try: |
| 305 | + captured["bytes"] = await page.screenshot(**screenshot_kwargs) |
| 306 | + captured["url"] = page.url |
| 307 | + except Exception as exc: |
| 308 | + captured["error"] = exc |
| 309 | + |
| 310 | + await entry.session.fetch( |
| 311 | + url, |
| 312 | + wait=wait, |
| 313 | + timeout=timeout, |
| 314 | + network_idle=network_idle, |
| 315 | + wait_selector=wait_selector, |
| 316 | + wait_selector_state=wait_selector_state, |
| 317 | + page_action=_capture, |
| 318 | + ) |
| 319 | + |
| 320 | + if "error" in captured: |
| 321 | + raise captured["error"] |
| 322 | + if "bytes" not in captured: |
| 323 | + raise RuntimeError(f"Failed to capture screenshot for {url}") |
| 324 | + |
| 325 | + image = Image(data=captured["bytes"], format=image_type).to_image_content() |
| 326 | + return [image, TextContent(type="text", text=captured["url"])] |
| 327 | + |
263 | 328 | @staticmethod |
264 | 329 | async def get( |
265 | 330 | url: str, |
@@ -298,7 +363,8 @@ async def get( |
298 | 363 | :param headers: Headers to include in the request. |
299 | 364 | :param cookies: Cookies to use in the request. |
300 | 365 | :param timeout: Number of seconds to wait before timing out. |
301 | | - :param follow_redirects: Whether to follow redirects. Defaults to "safe", which follows redirects but rejects those targeting internal/private IPs (SSRF protection). Pass True to follow all redirects without restriction. |
| 366 | + :param follow_redirects: Whether to follow redirects. Defaults to "safe", which follows redirects but rejects those targeting internal/private IPs (SSRF protection). |
| 367 | + Pass True to follow all redirects without restriction. |
302 | 368 | :param max_redirects: Maximum number of redirects. Default 30, use -1 for unlimited. |
303 | 369 | :param retries: Number of retry attempts. Defaults to 3. |
304 | 370 | :param retry_delay: Number of seconds to wait between retry attempts. Defaults to 1 second. |
@@ -371,7 +437,8 @@ async def bulk_get( |
371 | 437 | :param headers: Headers to include in the request. |
372 | 438 | :param cookies: Cookies to use in the request. |
373 | 439 | :param timeout: Number of seconds to wait before timing out. |
374 | | - :param follow_redirects: Whether to follow redirects. Defaults to "safe", which follows redirects but rejects those targeting internal/private IPs (SSRF protection). Pass True to follow all redirects without restriction. |
| 440 | + :param follow_redirects: Whether to follow redirects. Defaults to "safe", which follows redirects but rejects those targeting internal/private IPs (SSRF protection). |
| 441 | + Pass True to follow all redirects without restriction. |
375 | 442 | :param max_redirects: Maximum number of redirects. Default 30, use -1 for unlimited. |
376 | 443 | :param retries: Number of retry attempts. Defaults to 3. |
377 | 444 | :param retry_delay: Number of seconds to wait between retry attempts. Defaults to 1 second. |
@@ -835,4 +902,6 @@ def serve(self, http: bool, host: str, port: int): |
835 | 902 | description=self.bulk_stealthy_fetch.__doc__, |
836 | 903 | structured_output=True, |
837 | 904 | ) |
| 905 | + # Screenshot tool (returns image + url content blocks, not structured JSON) |
| 906 | + server.add_tool(self.screenshot, title="screenshot", description=self.screenshot.__doc__) |
838 | 907 | server.run(transport="stdio" if not http else "streamable-http") |
0 commit comments