Skip to content

Commit 7a21422

Browse files
feat(interactive): Implement restart for insight (#4555)
For restarting groot service, we need to send request to groot store pod. --------- Co-authored-by: BingqingLyu <bingqing.lbq@alibaba-inc.com>
1 parent e739cdf commit 7a21422

8 files changed

Lines changed: 246 additions & 1 deletion

File tree

charts/graphscope-store/templates/portal/statefulset.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,32 @@ spec:
107107
value: {{ .Values.frontend.service.gremlinPort | quote }}
108108
- name: GROOT_CYPHER_PORT
109109
value: {{ .Values.frontend.service.cypherPort | quote }}
110+
- name: GROOT_STORE_POD_ADMIN_PORT
111+
value: {{ .Values.store.service.adminPort | quote }}
110112
- name: GROOT_FRONTEND_POD_SUFFIX
113+
{{- if .Values.distributed.enabled }}
111114
value: {{ printf "%s-frontend" (include "graphscope-store.name" .) }}
115+
{{- else }}
116+
value: {{ printf "%s-portal" (include "graphscope-store.name" .) }}
117+
{{- end }}
112118
- name: GROOT_COORDINATOR_POD_SUFFIX
119+
{{- if .Values.distributed.enabled }}
113120
value: {{ printf "%s-coordinator" (include "graphscope-store.name" .) }}
121+
{{- else }}
122+
value: {{ printf "%s-portal" (include "graphscope-store.name" .) }}
123+
{{- end }}
114124
- name: GROOT_PORTAAL_POD_SUFFIX
125+
{{- if .Values.distributed.enabled }}
115126
value: {{ printf "%s-portal" (include "graphscope-store.name" .) }}
127+
{{- else }}
128+
value: {{ printf "%s-portal" (include "graphscope-store.name" .) }}
129+
{{- end }}
116130
- name: GROOT_STORE_POD_SUFFIX
131+
{{- if .Values.distributed.enabled }}
117132
value: {{ printf "%s-store" (include "graphscope-store.name" .) }}
133+
{{- else }}
134+
value: {{ printf "%s-portal" (include "graphscope-store.name" .) }}
135+
{{- end }}
118136
- name: INSTANCE_NAME
119137
value: {{ .Release.Name | quote }}
120138
- name: NAMESPACE

charts/graphscope-store/templates/store/statefulset.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ spec:
107107
valueFrom:
108108
fieldRef:
109109
fieldPath: metadata.name
110+
{{- if .Values.store.service.adminPort }}
111+
- name: GROOT_STORE_POD_ADMIN_PORT
112+
value: {{ .Values.store.service.adminPort | quote }}
113+
{{- end }}
110114
ports:
111115
- name: inner-rpc
112116
containerPort: 55555

charts/graphscope-store/values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ store:
7474
size: 1Gi
7575
selector: {}
7676

77+
service:
78+
adminPort: 10001
79+
7780
## @param hostAliases pods host aliases
7881
## https://kubernetes.io/docs/concepts/services-networking/add-entries-to-pod-etc-hosts-with-host-aliases/
7982
##

coordinator/gscoordinator/flex/core/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ def str_to_bool(s):
112112
GROOT_STORE_POD_SUFFIX = os.environ.get("GROOT_STORE_POD_SUFFIX", "graphscope-store-store")
113113
GROOT_COORDINATOR_POD_SUFFIX = os.environ.get("GROOT_COORDINATOR_POD_SUFFIX", "graphscope-store-coordinator")
114114
GROOT_PORTAL_POD_SUFFIX = os.environ.get("GROOT_PORTAL_POD_SUFFIX", "graphscope-store-portal")
115-
115+
GROOT_STORE_POD_ADMIN_PORT = os.environ.get("GROOT_STORE_POD_ADMIN_PORT", 10001)
116116

117117

118118
# dataloading service for groot

coordinator/gscoordinator/flex/core/insight/groot.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,15 @@
2828
import psutil
2929
from graphscope.config import Config
3030
from gremlin_python.driver.client import Client
31+
from kubernetes import client as kube_client
32+
from kubernetes import config as kube_config
3133

3234
from gscoordinator.flex.core.config import CLUSTER_TYPE
3335
from gscoordinator.flex.core.config import CREATION_TIME
36+
from gscoordinator.flex.core.config import GROOT_STORE_POD_ADMIN_PORT
37+
from gscoordinator.flex.core.config import GROOT_STORE_POD_SUFFIX
38+
from gscoordinator.flex.core.config import INSTANCE_NAME
39+
from gscoordinator.flex.core.config import NAMESPACE
3440
from gscoordinator.flex.core.config import STUDIO_WRAPPER_ENDPOINT
3541
from gscoordinator.flex.core.config import WORKSPACE
3642
from gscoordinator.flex.core.datasource import DataSourceManager
@@ -41,6 +47,8 @@
4147
from gscoordinator.flex.core.scheduler import cancel_job
4248
from gscoordinator.flex.core.scheduler import schedule
4349
from gscoordinator.flex.core.utils import encode_datetime
50+
from gscoordinator.flex.core.utils import get_pod_ips
51+
from gscoordinator.flex.core.utils import resolve_api_client
4452
from gscoordinator.flex.models import JobStatus
4553

4654
logger = logging.getLogger("graphscope")
@@ -93,6 +101,24 @@ def _pickle_job_status_impl(self):
93101
pickle.dump(status, f)
94102
except Exception as e:
95103
logger.warn("Pickle job status failed: %s", str(e))
104+
105+
def _restart_pod(self, pod_name, pod_ip, port):
106+
logger.info(f"Restart groot store pod {pod_name}, ip {pod_ip}")
107+
conn = http.client.HTTPConnection(pod_ip, port)
108+
conn.request("POST", "/shutdown")
109+
# expect the request didn't get any response, since the pod will kill it self
110+
try:
111+
r = conn.getresponse()
112+
if r.status != 500 or r.status != 503:
113+
raise RuntimeError("Failed to restart groot store pod: " + r.read().decode("utf-8"))
114+
else:
115+
logger.info(f"Restart groot store pod {pod_name} successfully")
116+
except http.client.RemoteDisconnected:
117+
logger.info(f"Restart groot store pod {pod_name} successfully")
118+
except Exception as e:
119+
raise RuntimeError("Failed to restart groot store pod: " + str(e))
120+
finally:
121+
conn.close()
96122

97123
def check_graph_exists(self, graph_id: str):
98124
if self._graph.id != graph_id:
@@ -115,6 +141,20 @@ def list_service_status(self) -> List[dict]:
115141
res[0]["sdk_endpoints"]["cypher"] = groot_endpoints["cypher_endpoint"]
116142
return res
117143

144+
def stop_service(self) -> str:
145+
raise RuntimeError("Stop service is not supported yet.")
146+
147+
def restart_service(self) -> str:
148+
api_client = resolve_api_client()
149+
pod_prefix = "{0}-{1}".format(INSTANCE_NAME, GROOT_STORE_POD_SUFFIX)
150+
ip_names = get_pod_ips(api_client, NAMESPACE, pod_prefix)
151+
for (ip, name) in ip_names:
152+
logger.info(f"Restart groot store pod {name}, ip {ip}")
153+
self._restart_pod(name, ip, GROOT_STORE_POD_ADMIN_PORT)
154+
155+
def start_service(self, graph_id: str) -> str:
156+
raise RuntimeError("Start service is not supported yet.")
157+
118158
def create_graph(self, graph: dict) -> dict:
119159
raise RuntimeError("Create graph is not supported yet.")
120160

coordinator/gscoordinator/flex/core/utils.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,38 @@ def resolve_api_client(k8s_config_file=None):
6767
raise RuntimeError("Resolve kube api client failed.")
6868
return kube_client.ApiClient()
6969

70+
def get_pod_ips(api_client, namespace, pod_prefix):
71+
"""Get pod ip by pod name prefix.
72+
73+
Args:
74+
api_client: ApiClient
75+
An kubernetes ApiClient object, initialized with the client args.
76+
namespace: str
77+
Namespace of the pod belongs to.
78+
pod_prefix: str
79+
Pod name prefix.
80+
81+
Raises:
82+
RuntimeError: Get pod ip failed.
83+
84+
Returns:
85+
Pod ip.
86+
"""
87+
from kubernetes import client as kube_client
88+
core_api = kube_client.CoreV1Api(api_client)
89+
pods = core_api.list_namespaced_pod(namespace=namespace)
90+
ips = []
91+
for pod in pods.items:
92+
if pod.metadata.name.startswith(pod_prefix):
93+
if pod.status.phase == "Running":
94+
# append (ip, pod_name)
95+
ips.append((pod.status.pod_ip, pod.metadata.name))
96+
else:
97+
raise RuntimeError(f"Pod {pod.metadata.name} is not running.")
98+
if not ips:
99+
raise RuntimeError(f"Get pod ip failed.")
100+
return ips
101+
70102

71103
def get_service_endpoints( # noqa: C901
72104
api_client,

interactive_engine/assembly/src/bin/groot/store_ctl.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ start_server() {
7070
-XX:NumberOfGCLogFiles=4
7171
-XX:GCLogFileSize=64m"
7272
export RUST_BACKTRACE=full
73+
java -cp ${libpath} com.alibaba.graphscope.groot.servers.GrootGraphDaemon &
7374
java ${java_opt} \
7475
-Dlogback.configurationFile="${GROOT_LOGBACK_FILE}" \
7576
-Dconfig.file="${GROOT_CONF_FILE}" \
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package com.alibaba.graphscope.groot.servers;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import java.io.BufferedReader;
7+
import java.io.IOException;
8+
import java.io.InputStreamReader;
9+
import java.io.PrintWriter;
10+
import java.net.ServerSocket;
11+
import java.net.Socket;
12+
import java.nio.charset.StandardCharsets;
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
import java.util.Optional;
16+
17+
public class GrootGraphDaemon {
18+
private static final Logger logger = LoggerFactory.getLogger(GrootGraphDaemon.class);
19+
private static final String GROOT_STORE_POD_ADMIN_PORT = "GROOT_STORE_POD_ADMIN_PORT";
20+
private static final int DEFAULT_PORT = 10001;
21+
private static final String GROOTGRAPH_NAME = "com.alibaba.graphscope.groot.servers.GrootGraph";
22+
23+
private int healthCheckPort;
24+
private volatile boolean isShuttingDown = false;
25+
26+
public GrootGraphDaemon() {
27+
this.healthCheckPort = getHealthCheckPort();
28+
}
29+
30+
private int getHealthCheckPort() {
31+
String port = System.getenv(GROOT_STORE_POD_ADMIN_PORT);
32+
if (port != null) {
33+
try {
34+
return Integer.parseInt(port);
35+
} catch (NumberFormatException e) {
36+
logger.error(
37+
"Failed to parse "
38+
+ GROOT_STORE_POD_ADMIN_PORT
39+
+ " from environment variable "
40+
+ port
41+
+ ", using default port "
42+
+ DEFAULT_PORT);
43+
return DEFAULT_PORT;
44+
}
45+
}
46+
return DEFAULT_PORT;
47+
}
48+
49+
public static void main(String[] args) {
50+
GrootGraphDaemon daemon = new GrootGraphDaemon();
51+
daemon.start();
52+
}
53+
54+
public void start() {
55+
try (ServerSocket serverSocket = new ServerSocket(healthCheckPort)) {
56+
logger.info("GrootGraph daemon started on port " + healthCheckPort);
57+
while (!isShuttingDown) {
58+
try (Socket clientSocket = serverSocket.accept()) {
59+
handleClientRequest(clientSocket);
60+
} catch (IOException e) {
61+
if (!isShuttingDown) {
62+
logger.error("Failed to accept client connection", e);
63+
}
64+
}
65+
}
66+
} catch (IOException e) {
67+
logger.error("Failed to start daemon", e);
68+
}
69+
}
70+
71+
private void handleClientRequest(Socket clientSocket) {
72+
try {
73+
// Read the request from client, only simple shutdown command expected
74+
byte[] buffer = new byte[1024];
75+
int readBytes = clientSocket.getInputStream().read(buffer);
76+
String request = new String(buffer, 0, readBytes, StandardCharsets.UTF_8).trim();
77+
78+
// Respond to client
79+
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
80+
81+
if (request.toLowerCase().contains("shutdown")) {
82+
isShuttingDown = true;
83+
// Shutdown GrootGraph
84+
shutdownGrootGraph();
85+
// Respond to client
86+
out.println("HTTP/1.0 200 OK");
87+
out.println("Content-Type: text/plain");
88+
out.println();
89+
out.println("GrootGraph has been shutdown successfully.");
90+
out.flush();
91+
isShuttingDown = false;
92+
} else {
93+
out.println("HTTP/1.0 400 Bad Request");
94+
out.println("Content-Type: text/plain");
95+
out.println();
96+
out.println("unknown command, only 'shutdown' is supported");
97+
out.flush();
98+
}
99+
} catch (IOException e) {
100+
e.printStackTrace();
101+
}
102+
}
103+
104+
private void shutdownGrootGraph() {
105+
try {
106+
// Execute the command to get the PID of the GrootGraph process
107+
String[] cmd = {
108+
"/bin/sh", "-c", "ps -ef | grep -w " + GROOTGRAPH_NAME + " | grep -v grep"
109+
};
110+
Process process = Runtime.getRuntime().exec(cmd);
111+
BufferedReader reader =
112+
new BufferedReader(new InputStreamReader(process.getInputStream()));
113+
List<Long> pids = new ArrayList<>();
114+
String line;
115+
while ((line = reader.readLine()) != null) {
116+
String[] parts = line.trim().split("\\s+");
117+
if (parts.length > 1) {
118+
try {
119+
long pid = Long.parseLong(parts[1]);
120+
pids.add(pid);
121+
} catch (NumberFormatException e) {
122+
logger.error("Failed to parse PID from: " + line);
123+
}
124+
}
125+
}
126+
reader.close();
127+
128+
for (Long pid : pids) {
129+
Optional<ProcessHandle> grootGraphProcess = ProcessHandle.of(pid);
130+
if (grootGraphProcess.isPresent()) {
131+
ProcessHandle processHandle = grootGraphProcess.get();
132+
processHandle.destroy();
133+
if (processHandle.isAlive()) {
134+
processHandle.onExit().join();
135+
}
136+
logger.debug(
137+
"GrootGraph process with PID "
138+
+ pid
139+
+ " has been shut down successfully.");
140+
}
141+
}
142+
143+
} catch (Exception e) {
144+
logger.error("Failed to shutdown GrootGraph", e);
145+
}
146+
}
147+
}

0 commit comments

Comments
 (0)