-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconnection.py
More file actions
75 lines (60 loc) · 2.64 KB
/
connection.py
File metadata and controls
75 lines (60 loc) · 2.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""MongoDB connection manager."""
import ssl
from typing import Optional
import certifi
import structlog
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
logger = structlog.get_logger()
class MongoDBConnection:
"""MongoDB connection manager."""
def __init__(self) -> None:
self._client: Optional[AsyncIOMotorClient] = None
self._database: Optional[AsyncIOMotorDatabase] = None
async def connect(self, mongodb_url: str, database_name: str) -> None:
"""Connect to MongoDB."""
try:
# Configure SSL settings for MongoDB Atlas
if "mongodb+srv://" in mongodb_url:
# Create SSL context with proper certificate verification
ssl_context = ssl.create_default_context(cafile=certifi.where())
ssl_context.check_hostname = False # Atlas handles this
ssl_context.verify_mode = ssl.CERT_REQUIRED
# For MongoDB Atlas, we need to handle SSL properly
self._client = AsyncIOMotorClient(
mongodb_url,
tls=True,
tlsCAFile=certifi.where(),
retryWrites=True,
serverSelectionTimeoutMS=5000, # 5 second timeout
connectTimeoutMS=10000, # 10 second connection timeout
socketTimeoutMS=20000, # 20 second socket timeout
)
else:
# For local MongoDB
self._client = AsyncIOMotorClient(mongodb_url)
self._database = self._client[database_name]
# Test connection with timeout
await self._client.admin.command("ping")
logger.info("Connected to MongoDB", database=database_name)
except Exception as e:
logger.error("Failed to connect to MongoDB", error=str(e))
raise
async def disconnect(self) -> None:
"""Disconnect from MongoDB."""
if self._client:
self._client.close()
logger.info("Disconnected from MongoDB")
@property
def database(self) -> AsyncIOMotorDatabase:
"""Get database instance."""
if self._database is None:
raise RuntimeError("Database connection not established")
return self._database
@property
def client(self) -> AsyncIOMotorClient:
"""Get client instance."""
if self._client is None:
raise RuntimeError("Database connection not established")
return self._client
# Global connection instance
mongodb_connection = MongoDBConnection()