-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Expand file tree
/
Copy pathfetch_node.py
More file actions
409 lines (345 loc) · 15.3 KB
/
fetch_node.py
File metadata and controls
409 lines (345 loc) · 15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
"""
FetchNode Module
"""
import json
from typing import List, Optional
import concurrent.futures
import requests
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.documents import Document
from langchain_openai import AzureChatOpenAI, ChatOpenAI
from ..docloaders import ChromiumLoader
from ..utils.cleanup_html import cleanup_html
from ..utils.convert_to_md import convert_to_md
from .base_node import BaseNode
class FetchNode(BaseNode):
"""
A node responsible for fetching the HTML content of a specified URL and updating
the graph's state with this content. It uses ChromiumLoader to fetch
the content from a web page asynchronously (with proxy protection).
This node acts as a starting point in many scraping workflows, preparing the state
with the necessary HTML content for further processing by subsequent nodes in the graph.
Attributes:
headless (bool): A flag indicating whether the browser should run in headless mode.
verbose (bool): A flag indicating whether to print verbose output during execution.
Args:
input (str): Boolean expression defining the input keys needed from the state.
output (List[str]): List of output keys to be updated in the state.
node_config (Optional[dict]): Additional configuration for the node.
node_name (str): The unique identifier name for the node, defaulting to "Fetch".
"""
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "Fetch",
):
super().__init__(node_name, "node", input, output, 1, node_config)
self.headless = (
True if node_config is None else node_config.get("headless", True)
)
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
self.use_soup = (
False if node_config is None else node_config.get("use_soup", False)
)
self.loader_kwargs = (
{} if node_config is None else node_config.get("loader_kwargs", {})
)
self.llm_model = {} if node_config is None else node_config.get("llm_model", {})
self.force = False if node_config is None else node_config.get("force", False)
self.script_creator = (
False if node_config is None else node_config.get("script_creator", False)
)
self.openai_md_enabled = (
False
if node_config is None
else node_config.get("openai_md_enabled", False)
)
# Timeout in seconds for blocking operations (HTTP requests, PDF parsing, etc.).
# If set to None, no timeout will be applied.
self.timeout = None if node_config is None else node_config.get("timeout", 30)
self.cut = False if node_config is None else node_config.get("cut", True)
self.browser_base = (
None if node_config is None else node_config.get("browser_base", None)
)
self.scrape_do = (
None if node_config is None else node_config.get("scrape_do", None)
)
self.plasmate = (
None if node_config is None else node_config.get("plasmate", None)
)
self.storage_state = (
None if node_config is None else node_config.get("storage_state", None)
)
def execute(self, state):
"""
Executes the node's logic to fetch HTML content from a specified URL and
update the state with this content.
"""
self.logger.info(f"--- Executing {self.node_name} Node ---")
input_keys = self.get_input_keys(state)
input_data = [state[key] for key in input_keys]
source = input_data[0]
input_type = input_keys[0]
handlers = {
"json_dir": self.handle_directory,
"xml_dir": self.handle_directory,
"csv_dir": self.handle_directory,
"pdf_dir": self.handle_directory,
"md_dir": self.handle_directory,
"pdf": self.handle_file,
"csv": self.handle_file,
"json": self.handle_file,
"xml": self.handle_file,
"md": self.handle_file,
}
if input_type in handlers:
return handlers[input_type](state, input_type, source)
elif input_type == "local_dir":
return self.handle_local_source(state, source)
elif input_type == "url":
return self.handle_web_source(state, source)
else:
raise ValueError(f"Invalid input type: {input_type}")
def handle_directory(self, state, input_type, source):
"""
Handles the directory by compressing the source document and updating the state.
Parameters:
state (dict): The current state of the graph.
input_type (str): The type of input being processed.
source (str): The source document to be compressed.
Returns:
dict: The updated state with the compressed document.
"""
compressed_document = [source]
state.update({self.output[0]: compressed_document})
return state
def handle_file(self, state, input_type, source):
"""
Loads the content of a file based on its input type.
Parameters:
state (dict): The current state of the graph.
input_type (str): The type of the input file (e.g., "pdf", "csv", "json", "xml", "md").
source (str): The path to the source file.
Returns:
dict: The updated state with the compressed document.
The function supports the following input types:
- "pdf": Uses PyPDFLoader to load the content of a PDF file.
- "csv": Reads the content of a CSV file using pandas and converts it to a string.
- "json": Loads the content of a JSON file.
- "xml": Reads the content of an XML file as a string.
- "md": Reads the content of a Markdown file as a string.
"""
compressed_document = self.load_file_content(source, input_type)
# return self.update_state(state, compressed_document)
state.update({self.output[0]: compressed_document})
return state
def load_file_content(self, source, input_type):
"""
Loads the content of a file based on its input type.
Parameters:
source (str): The path to the source file.
input_type (str): The type of the input file (e.g., "pdf", "csv", "json", "xml", "md").
Returns:
list: A list containing a Document object with the loaded content and metadata.
"""
if input_type == "pdf":
loader = PyPDFLoader(source)
# PyPDFLoader.load() can be blocking for large PDFs. Run it in a thread and
# enforce the configured timeout if provided.
if self.timeout is None:
return loader.load()
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(loader.load)
try:
return future.result(timeout=self.timeout)
except concurrent.futures.TimeoutError:
raise TimeoutError(
f"PDF parsing exceeded timeout of {self.timeout} seconds"
)
elif input_type == "csv":
try:
import pandas as pd
except ImportError:
raise ImportError(
"pandas is not installed. Please install it using `pip install pandas`."
)
return [
Document(
page_content=str(pd.read_csv(source)), metadata={"source": "csv"}
)
]
elif input_type == "json":
with open(source, encoding="utf-8") as f:
return [
Document(
page_content=str(json.load(f)), metadata={"source": "json"}
)
]
elif input_type == "xml" or input_type == "md":
with open(source, "r", encoding="utf-8") as f:
data = f.read()
return [Document(page_content=data, metadata={"source": input_type})]
def handle_local_source(self, state, source):
"""
Handles the local source by fetching HTML content, optionally converting it to Markdown,
and updating the state.
Parameters:
state (dict): The current state of the graph.
source (str): The HTML content from the local source.
Returns:
dict: The updated state with the processed content.
Raises:
ValueError: If the source is empty or contains only whitespace.
"""
self.logger.info(f"--- (Fetching HTML from: {source}) ---")
if not source.strip():
raise ValueError("No HTML body content found in the local source.")
parsed_content = source
if (
(
isinstance(self.llm_model, ChatOpenAI)
or isinstance(self.llm_model, AzureChatOpenAI)
)
and not self.script_creator
or self.force
and not self.script_creator
):
parsed_content = convert_to_md(source)
else:
parsed_content = source
compressed_document = [
Document(page_content=parsed_content, metadata={"source": "local_dir"})
]
# return self.update_state(state, compressed_document)
state.update({self.output[0]: compressed_document})
return state
def handle_web_source(self, state, source):
"""
Handles the web source by fetching HTML content from a URL,
optionally converting it to Markdown, and updating the state.
Parameters:
state (dict): The current state of the graph.
source (str): The URL of the web source to fetch HTML content from.
Returns:
dict: The updated state with the processed content.
Raises:
ValueError: If the fetched HTML content is empty or contains only whitespace.
"""
self.logger.info(f"--- (Fetching HTML from: {source}) ---")
if self.use_soup:
# Apply configured timeout to blocking HTTP requests. If timeout is None,
# don't pass the timeout argument (requests will block until completion).
if self.timeout is None:
response = requests.get(source)
else:
response = requests.get(source, timeout=self.timeout)
if response.status_code == 200:
if not response.text.strip():
raise ValueError("No HTML body content found in the response.")
if not self.cut:
parsed_content = cleanup_html(response, source)
if (
isinstance(self.llm_model, (ChatOpenAI, AzureChatOpenAI))
and not self.script_creator
or (self.force and not self.script_creator)
):
parsed_content = convert_to_md(source, parsed_content)
compressed_document = [Document(page_content=parsed_content)]
else:
self.logger.warning(
f"Failed to retrieve contents from the webpage at url: {source}"
)
else:
loader_kwargs = {}
if self.node_config:
loader_kwargs = self.node_config.get("loader_kwargs", {})
# If a global timeout is configured on the node and no loader-specific timeout
# was provided, propagate it to ChromiumLoader so it can apply the same limit.
if "timeout" not in loader_kwargs and self.timeout is not None:
loader_kwargs["timeout"] = self.timeout
if self.browser_base:
try:
from ..docloaders.browser_base import browser_base_fetch
except ImportError:
raise ImportError(
"""The browserbase module is not installed.
Please install it using `pip install browserbase`."""
)
data = browser_base_fetch(
self.browser_base.get("api_key"),
self.browser_base.get("project_id"),
[source],
)
document = [
Document(page_content=content, metadata={"source": source})
for content in data
]
elif self.scrape_do:
from ..docloaders.scrape_do import scrape_do_fetch
if (
(self.scrape_do.get("use_proxy") is None)
or self.scrape_do.get("geoCode") is None
or self.scrape_do.get("super_proxy") is None
):
data = scrape_do_fetch(self.scrape_do.get("api_key"), source)
else:
data = scrape_do_fetch(
self.scrape_do.get("api_key"),
source,
self.scrape_do.get("use_proxy"),
self.scrape_do.get("geoCode"),
self.scrape_do.get("super_proxy"),
)
document = [Document(page_content=data, metadata={"source": source})]
elif self.plasmate is not None:
from ..docloaders.plasmate import PlasmateLoader
plasmate_cfg = self.plasmate if isinstance(self.plasmate, dict) else {}
loader = PlasmateLoader(
[source],
output_format=plasmate_cfg.get("output_format", "text"),
timeout=plasmate_cfg.get("timeout", self.timeout or 30),
selector=plasmate_cfg.get("selector"),
extra_headers=plasmate_cfg.get("extra_headers", {}),
fallback_to_chrome=plasmate_cfg.get("fallback_to_chrome", False),
)
document = loader.load()
else:
loader = ChromiumLoader(
[source],
headless=self.headless,
storage_state=self.storage_state,
**loader_kwargs,
)
document = loader.load()
if not document or not document[0].page_content.strip():
raise ValueError(
"""No HTML body content found in
the document fetched by ChromiumLoader."""
)
parsed_content = document[0].page_content
if (
(
isinstance(self.llm_model, ChatOpenAI)
or isinstance(self.llm_model, AzureChatOpenAI)
)
and not self.script_creator
or self.force
and not self.script_creator
and not self.openai_md_enabled
):
parsed_content = convert_to_md(document[0].page_content, parsed_content)
compressed_document = [
Document(page_content=parsed_content, metadata={"source": "html file"})
]
state["doc"] = document
state.update(
{
self.output[0]: compressed_document,
}
)
return state