Skip to content

Commit 5bfae18

Browse files
committed
refactor: unify cross-repo storage on edges table
Migrate the messaging-protocol cross-project matcher from a separate _crosslinks.db file to bidirectional CROSS_* edges in each project's edges table. Add 11 new CROSS_* edge type constants for messaging protocols (KAFKA, SQS, SNS, EVENTBRIDGE, PUBSUB, AMQP, MQTT, NATS, REDIS_PUBSUB, WS, SSE). Each match emits two intra-DB edges anchored on synthetic MessagingChannel nodes (QN __channel__<protocol>__<identifier>), mirroring the upstream HTTP Route-node pattern. Producer DB gets function -> channel; consumer DB gets channel -> function. Cross-project metadata lives in edge properties JSON. The matcher now skips http/grpc/graphql/trpc protocols entirely; those are owned by the upstream Route-QN matcher in pass_cross_repo.c.
1 parent 6eafb10 commit 5bfae18

4 files changed

Lines changed: 740 additions & 687 deletions

File tree

src/pipeline/pass_cross_repo.c

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,42 @@ enum {
3939
#define CR_MS_PER_SEC 1000.0
4040
#define CR_NS_PER_MS 1000000.0
4141

42+
/* ── Messaging protocol → CROSS_* edge type mapping ──────────────
43+
*
44+
* Upstream owns HTTP/gRPC/GraphQL/tRPC via Route-QN matching, so those
45+
* protocols are intentionally absent from this mapping: the messaging
46+
* matcher in pass_crossrepolinks.c skips them via
47+
* cbm_messaging_protocol_to_cross_edge() returning NULL, leaving upstream
48+
* as the sole source of their CROSS_* edges.
49+
*/
50+
const char *const CBM_MESSAGING_CROSS_EDGE_TYPES[CBM_MESSAGING_CROSS_EDGE_TYPE_COUNT] = {
51+
CBM_EDGE_CROSS_KAFKA_CALLS, CBM_EDGE_CROSS_SQS_CALLS,
52+
CBM_EDGE_CROSS_SNS_CALLS, CBM_EDGE_CROSS_EVENTBRIDGE_CALLS,
53+
CBM_EDGE_CROSS_PUBSUB_CALLS, CBM_EDGE_CROSS_AMQP_CALLS,
54+
CBM_EDGE_CROSS_MQTT_CALLS, CBM_EDGE_CROSS_NATS_CALLS,
55+
CBM_EDGE_CROSS_REDIS_PUBSUB_CALLS, CBM_EDGE_CROSS_WS_CALLS,
56+
CBM_EDGE_CROSS_SSE_CALLS,
57+
};
58+
59+
const char *cbm_messaging_protocol_to_cross_edge(const char *protocol) {
60+
if (!protocol) {
61+
return NULL;
62+
}
63+
if (strcmp(protocol, "kafka") == 0) return CBM_EDGE_CROSS_KAFKA_CALLS;
64+
if (strcmp(protocol, "sqs") == 0) return CBM_EDGE_CROSS_SQS_CALLS;
65+
if (strcmp(protocol, "sns") == 0) return CBM_EDGE_CROSS_SNS_CALLS;
66+
if (strcmp(protocol, "eventbridge") == 0) return CBM_EDGE_CROSS_EVENTBRIDGE_CALLS;
67+
if (strcmp(protocol, "pubsub") == 0) return CBM_EDGE_CROSS_PUBSUB_CALLS;
68+
if (strcmp(protocol, "rabbitmq") == 0) return CBM_EDGE_CROSS_AMQP_CALLS;
69+
if (strcmp(protocol, "amqp") == 0) return CBM_EDGE_CROSS_AMQP_CALLS;
70+
if (strcmp(protocol, "mqtt") == 0) return CBM_EDGE_CROSS_MQTT_CALLS;
71+
if (strcmp(protocol, "nats") == 0) return CBM_EDGE_CROSS_NATS_CALLS;
72+
if (strcmp(protocol, "redis_pubsub") == 0) return CBM_EDGE_CROSS_REDIS_PUBSUB_CALLS;
73+
if (strcmp(protocol, "ws") == 0) return CBM_EDGE_CROSS_WS_CALLS;
74+
if (strcmp(protocol, "sse") == 0) return CBM_EDGE_CROSS_SSE_CALLS;
75+
return NULL;
76+
}
77+
4278
/* TLS buffer for integer-to-string in log calls. */
4379
static CBM_TLS char cr_ibuf[CBM_SZ_32];
4480
static const char *cr_itoa(int v) {

src/pipeline/pass_cross_repo.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,41 @@
77

88
#include "store/store.h"
99

10+
/* ── CROSS_* edge type names ─────────────────────────────────────
11+
*
12+
* Upstream Route-QN matcher emits the first six. The messaging matcher
13+
* (pass_crossrepolinks.c) emits the remaining eleven.
14+
*/
15+
#define CBM_EDGE_CROSS_HTTP_CALLS "CROSS_HTTP_CALLS"
16+
#define CBM_EDGE_CROSS_ASYNC_CALLS "CROSS_ASYNC_CALLS"
17+
#define CBM_EDGE_CROSS_CHANNEL "CROSS_CHANNEL"
18+
#define CBM_EDGE_CROSS_GRPC_CALLS "CROSS_GRPC_CALLS"
19+
#define CBM_EDGE_CROSS_GRAPHQL_CALLS "CROSS_GRAPHQL_CALLS"
20+
#define CBM_EDGE_CROSS_TRPC_CALLS "CROSS_TRPC_CALLS"
21+
22+
#define CBM_EDGE_CROSS_KAFKA_CALLS "CROSS_KAFKA_CALLS"
23+
#define CBM_EDGE_CROSS_SQS_CALLS "CROSS_SQS_CALLS"
24+
#define CBM_EDGE_CROSS_SNS_CALLS "CROSS_SNS_CALLS"
25+
#define CBM_EDGE_CROSS_EVENTBRIDGE_CALLS "CROSS_EVENTBRIDGE_CALLS"
26+
#define CBM_EDGE_CROSS_PUBSUB_CALLS "CROSS_PUBSUB_CALLS"
27+
#define CBM_EDGE_CROSS_AMQP_CALLS "CROSS_AMQP_CALLS"
28+
#define CBM_EDGE_CROSS_MQTT_CALLS "CROSS_MQTT_CALLS"
29+
#define CBM_EDGE_CROSS_NATS_CALLS "CROSS_NATS_CALLS"
30+
#define CBM_EDGE_CROSS_REDIS_PUBSUB_CALLS "CROSS_REDIS_PUBSUB_CALLS"
31+
#define CBM_EDGE_CROSS_WS_CALLS "CROSS_WS_CALLS"
32+
#define CBM_EDGE_CROSS_SSE_CALLS "CROSS_SSE_CALLS"
33+
34+
/* All messaging CROSS_* edge types produced by pass_crossrepolinks.c.
35+
* Used for idempotent cleanup before re-emission and for MCP queries. */
36+
extern const char *const CBM_MESSAGING_CROSS_EDGE_TYPES[];
37+
#define CBM_MESSAGING_CROSS_EDGE_TYPE_COUNT 11
38+
39+
/* Map a messaging protocol name (e.g. "kafka") to its CROSS_* edge type
40+
* constant (e.g. "CROSS_KAFKA_CALLS"). Returns NULL for unknown/skipped
41+
* protocols ("http", "grpc", "graphql", "trpc" are owned by the upstream
42+
* Route-QN matcher and intentionally return NULL here). */
43+
const char *cbm_messaging_protocol_to_cross_edge(const char *protocol);
44+
1045
/* Result of a cross-repo matching run. */
1146
typedef struct {
1247
int http_edges; /* CROSS_HTTP_CALLS edges created */

0 commit comments

Comments
 (0)