Skip to content

Commit a56d4f9

Browse files
intermamy-ship-it
authored andcommitted
Reuse the epoll object to fix the dispatcher performance downgrade (#14800)
In commit e30909f, I used the "WaitEventSet" to refactor the related dispatch logic. But in the following pgbench test, we found the performance downgrade 50% due to this commit. Did a perf on it, the bottleneck is native_queued_spin_lock_slowpath, means there is a lot of lock contention in it. After investigation, we found the culprit is a large number of concurrent creation and destruction of epoll objects. Fix it in the commit: reuse the epoll object.And here are the pgbench results for reference: ``` ./pgbench tpcb -j 200 -c 200 -n -P 1 -h mdw -p 5432 -U gpadmin -T 60 -S // main tps = 48662.387634 (including connections establishing) tps = 48697.870330 (excluding connections establishing) // revert e30909f ... tps = 151641.552024 (including connections establishing) tps = 151757.591140 (excluding connections establishing) // main with this commit (fixed) ... tps = 148574.817871 (including connections establishing) tps = 148666.308622 (excluding connections establishing) // 6x ... tps = 114706.905303 (including connections establishing) tps = 114795.896798 (excluding connections establishing) ```
1 parent 61cdc19 commit a56d4f9

3 files changed

Lines changed: 132 additions & 28 deletions

File tree

contrib/interconnect/udp/ic_udpifc.c

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,8 @@ struct SendControlInfo
385385
* and congestion control.
386386
*/
387387
static SendControlInfo snd_control_info;
388+
/* WaitEventSet for the icudp */
389+
static WaitEventSet *ICWaitSet = NULL;
388390

389391
/*
390392
* ICGlobalControlInfo
@@ -779,6 +781,8 @@ static inline void logPkt(char *prefix, icpkthdr *pkt);
779781
static void aggregateStatistics(ChunkTransportStateEntry *pChunkEntry);
780782

781783
static inline bool pollAcks(ChunkTransportState *transportStates, int fd, int timeout);
784+
static TupleChunkListItem receiveChunksUDPIFCLoop(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
785+
int16 *srcRoute, MotionConn *conn, WaitEventSet *waitset, int nevent);
782786

783787
/* #define TRANSFER_PROTOCOL_STATS */
784788

@@ -3820,15 +3824,10 @@ static TupleChunkListItem
38203824
receiveChunksUDPIFC(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
38213825
int16 motNodeID, int16 *srcRoute, MotionConn *mConn)
38223826
{
3823-
int retries = 0;
38243827
bool directed = false;
38253828
int nFds = 0;
38263829
int *waitFds = NULL;
38273830
int nevent = 0;
3828-
MotionConn *rxconn = NULL;
3829-
MotionConnUDP *udpRXconn = NULL;
3830-
WaitEvent *rEvents = NULL;
3831-
WaitEventSet *waitset = NULL;
38323831
TupleChunkListItem tcItem = NULL;
38333832
MotionConnUDP *conn = NULL;
38343833

@@ -3865,17 +3864,53 @@ receiveChunksUDPIFC(ChunkTransportState *pTransportStates, ChunkTransportStateEn
38653864

38663865
}
38673866

3868-
/* init WaitEventSet */
3869-
waitset = CreateWaitEventSet(CurrentMemoryContext, nevent);
3870-
rEvents = palloc(nevent * sizeof(WaitEvent)); /* returned events */
3871-
AddWaitEventToSet(waitset, WL_LATCH_SET, PGINVALID_SOCKET, &ic_control_info.latch, NULL);
3872-
AddWaitEventToSet(waitset, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
3867+
/* reset WaitEventSet */
3868+
ResetWaitEventSet(&ICWaitSet, TopMemoryContext, nevent);
3869+
3870+
/*
3871+
* Use PG_TRY() - PG_CATCH() to make sure destroy the waiteventset (close the epoll fd)
3872+
* The main receive logic is in receiveChunksUDPIFCLoop()
3873+
*/
3874+
PG_TRY();
3875+
{
3876+
AddWaitEventToSet(ICWaitSet, WL_LATCH_SET, PGINVALID_SOCKET, &ic_control_info.latch, NULL);
3877+
AddWaitEventToSet(ICWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
3878+
for (int i = 0; i < nFds; i++)
3879+
{
3880+
AddWaitEventToSet(ICWaitSet, WL_SOCKET_READABLE, waitFds[i], NULL, NULL);
3881+
}
38733882

3874-
for (int i = 0; i < nFds; i++)
3883+
tcItem = receiveChunksUDPIFCLoop(pTransportStates, pEntry, srcRoute, mConn, ICWaitSet, nevent);
3884+
}
3885+
PG_CATCH();
38753886
{
3876-
AddWaitEventToSet(waitset, WL_SOCKET_READABLE, waitFds[i], NULL, NULL);
3887+
if (waitFds != NULL)
3888+
pfree(waitFds);
3889+
PG_RE_THROW();
38773890
}
3891+
PG_END_TRY();
3892+
3893+
if (waitFds != NULL)
3894+
pfree(waitFds);
3895+
3896+
return tcItem;
3897+
}
3898+
3899+
static TupleChunkListItem
3900+
receiveChunksUDPIFCLoop(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
3901+
int16 *srcRoute, MotionConn *conn, WaitEventSet *waitset, int nevent)
3902+
{
3903+
TupleChunkListItem tcItem = NULL;
3904+
MotionConn *rxconn = NULL;
3905+
MotionConnUDP *udpRXconn = NULL;
3906+
int retries = 0;
3907+
bool directed = false;
3908+
WaitEvent *rEvents = NULL;
38783909

3910+
if (conn != NULL)
3911+
directed = true;
3912+
3913+
rEvents = palloc(nevent * sizeof(WaitEvent)); /* returned events */
38793914
/* we didn't have any data, so we've got to read it from the network. */
38803915
for (;;)
38813916
{
@@ -3905,13 +3940,7 @@ receiveChunksUDPIFC(ChunkTransportState *pTransportStates, ChunkTransportStateEn
39053940

39063941
if (!directed)
39073942
*srcRoute = udpRXconn->route;
3908-
3909-
FreeWaitEventSet(waitset);
3910-
if (rEvents != NULL)
3911-
pfree(rEvents);
3912-
if (waitFds != NULL)
3913-
pfree(waitFds);
3914-
3943+
pfree(rEvents);
39153944
return tcItem;
39163945
}
39173946

@@ -3963,7 +3992,7 @@ receiveChunksUDPIFC(ChunkTransportState *pTransportStates, ChunkTransportStateEn
39633992
}
39643993

39653994
/*
3966-
* 1. NIC on master (and thus the QD connection) may become bad, check
3995+
* 1. NIC on coordinator (and thus the QD connection) may become bad, check
39673996
* it. 2. Postmaster may become invalid, check it
39683997
*/
39693998
if ((retries & 0x3f) == 0)
@@ -3978,19 +4007,13 @@ receiveChunksUDPIFC(ChunkTransportState *pTransportStates, ChunkTransportStateEn
39784007
}
39794008

39804009
pthread_mutex_lock(&ic_control_info.lock);
3981-
3982-
} /* for (;;) */
3983-
3984-
FreeWaitEventSet(waitset);
3985-
if (rEvents != NULL)
3986-
pfree(rEvents);
3987-
if (waitFds != NULL)
3988-
pfree(waitFds);
4010+
} /* for (;;) */
39894011

39904012
/* We either got data, or get cancelled. We never make it out to here. */
39914013
return NULL; /* make GCC behave */
39924014
}
39934015

4016+
39944017
TupleChunkListItem
39954018
RecvTupleChunkUDPIFC(MotionConn *conn, ChunkTransportState *transportStates)
39964019
{

src/backend/storage/ipc/latch.c

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,84 @@ CreateWaitEventSet(MemoryContext context, int nevents)
785785
return set;
786786
}
787787

788+
/*
789+
* If *pset == NULL create a new WaitEventSet and set it to *pset
790+
* Else reset the existed WaitEventSet:
791+
* - if enable epoll, it frees the old memory and reuse the epoll object.
792+
* - else it just Free+Create a new WaitEventSet.
793+
*
794+
* Note: Why we want to reuse the epoll object? In previous code, it create
795+
* and free waiteventset several times for one query. And it leads performance
796+
* downgrade in pgbench test. After investigation, we found the culprit is
797+
* a large number of concurrent creation and destruction of epoll objects. So
798+
* we introduce the function to reuse epoll object.
799+
*/
800+
void
801+
ResetWaitEventSet(WaitEventSet **pset, MemoryContext context, int nevents)
802+
{
803+
if (*pset == NULL)
804+
{
805+
*pset = CreateWaitEventSet(context, nevents);
806+
return;
807+
}
808+
809+
#if defined(WAIT_USE_EPOLL)
810+
WaitEventSet *set = *pset;
811+
/* delete existed events */
812+
for (int i = 0; i < set->nevents; i++)
813+
{
814+
WaitEvent *event = &(set->events[i]);
815+
int rc;
816+
rc = epoll_ctl(set->epoll_fd, EPOLL_CTL_DEL, event->fd, NULL);
817+
if (rc < 0)
818+
{
819+
ereport(DEBUG1,
820+
(errcode_for_socket_access(),
821+
/* translator: %s is a syscall name, such as "poll()" */
822+
errmsg("%s cleanup epoll events failed: %m",
823+
"epoll_ctl()")));
824+
/*
825+
* in some scenarios, the event's fd has been cleanup,
826+
* so need to destory the whold epoll object
827+
*/
828+
goto CREATE_NEW_EV;
829+
}
830+
}
831+
832+
/* the same alloc logic as CreateWaitEventSet() */
833+
char *data;
834+
Size sz = 0;
835+
sz += MAXALIGN(sizeof(WaitEventSet));
836+
sz += MAXALIGN(sizeof(WaitEvent) * nevents);
837+
sz += MAXALIGN(sizeof(struct epoll_event) * nevents);
838+
data = (char *) MemoryContextAllocZero(context, sz);
839+
840+
/* reuse the epoll object and free the old memory */
841+
int old_epoll_fd = set->epoll_fd;
842+
pfree(set);
843+
844+
set = (WaitEventSet *) data;
845+
data += MAXALIGN(sizeof(WaitEventSet));
846+
set->events = (WaitEvent *) data;
847+
data += MAXALIGN(sizeof(WaitEvent) * nevents);
848+
set->epoll_ret_events = (struct epoll_event *) data;
849+
850+
set->latch = NULL;
851+
set->nevents_space = nevents;
852+
set->exit_on_postmaster_death = false;
853+
854+
/* reuse the epoll object (is empty now) */
855+
set->epoll_fd = old_epoll_fd;
856+
857+
*pset = set;
858+
return;
859+
#endif
860+
CREATE_NEW_EV:
861+
FreeWaitEventSet(*pset);
862+
*pset = CreateWaitEventSet(context, nevents);
863+
864+
}
865+
788866
/*
789867
* Free a previously created WaitEventSet.
790868
*

src/include/storage/latch.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,5 +180,8 @@ extern int WaitLatchOrSocket(Latch *latch, int wakeEvents,
180180
pgsocket sock, long timeout, uint32 wait_event_info);
181181
extern void InitializeLatchWaitSet(void);
182182
extern int GetNumRegisteredWaitEvents(WaitEventSet *set);
183+
/* specifial function for gpdb */
184+
extern void ResetWaitEventSet(WaitEventSet **pset, MemoryContext context, int nevents);
185+
183186

184187
#endif /* LATCH_H */

0 commit comments

Comments
 (0)