Skip to content

Commit fbc4cd3

Browse files
committed
fix Kafka FindCoordinator and Metadata preferring host from spec by port
1 parent f95d7db commit fbc4cd3

23 files changed

Lines changed: 274 additions & 145 deletions

api/handler_kafka.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,9 @@ func getKafka(info *runtime.KafkaInfo) kafkaInfo {
420420
}
421421
}
422422

423-
for name, s := range info.Servers {
423+
for it := info.Servers.Iter(); it.Next(); {
424+
name := it.Key()
425+
s := it.Value()
424426
if s == nil || s.Value == nil {
425427
continue
426428
}

config/dynamic/asyncApi/convert.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"mokapi/config/dynamic"
66
"mokapi/providers/asyncapi3"
7+
"mokapi/sortedmap"
78
"net/url"
89
"path"
910
"strings"
@@ -316,15 +317,15 @@ func convertServers(cfg *asyncapi3.Config, servers map[string]*ServerRef) {
316317
return
317318
}
318319
if cfg.Servers == nil {
319-
cfg.Servers = map[string]*asyncapi3.ServerRef{}
320+
cfg.Servers = &sortedmap.LinkedHashMap[string, *asyncapi3.ServerRef]{}
320321
}
321322

322323
for name, orig := range servers {
323324
if len(orig.Ref) > 0 {
324-
cfg.Servers[name] = &asyncapi3.ServerRef{Reference: dynamic.Reference{Ref: orig.Ref}}
325+
cfg.Servers.Set(name, &asyncapi3.ServerRef{Reference: dynamic.Reference{Ref: orig.Ref}})
325326
}
326327
if orig.Value != nil {
327-
cfg.Servers[name] = convertServer(orig.Value)
328+
cfg.Servers.Set(name, convertServer(orig.Value))
328329
}
329330
}
330331
}

config/dynamic/asyncApi/convert_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ func TestConfig_Convert(t *testing.T) {
4040
require.Equal(t, "https://www.apache.org/licenses/LICENSE-2.0", cfg3.Info.License.Url)
4141

4242
// Server
43-
require.Len(t, cfg3.Servers, 1)
44-
require.Equal(t, "test.mosquitto.org:{port}", cfg3.Servers["production"].Value.Host)
45-
require.Equal(t, "kafka", cfg3.Servers["production"].Value.Protocol)
46-
require.Equal(t, "Test broker", cfg3.Servers["production"].Value.Description)
43+
require.Equal(t, cfg3.Servers.Len(), 1)
44+
require.Equal(t, "test.mosquitto.org:{port}", cfg3.Servers.Lookup("production").Value.Host)
45+
require.Equal(t, "kafka", cfg3.Servers.Lookup("production").Value.Protocol)
46+
require.Equal(t, "Test broker", cfg3.Servers.Lookup("production").Value.Description)
4747

4848
// Channel
4949
channel := cfg3.Channels["smartylighting/streetlights/1/0/event/{streetlightId}/lighting/measured"].Value
@@ -90,9 +90,9 @@ func TestServer_Convert(t *testing.T) {
9090
cfg: asyncapitest.NewConfig(asyncapitest.WithServer("foo", "kafka", "mokapi-service:9092")),
9191
test: func(t *testing.T, config *asyncapi3.Config, err error) {
9292
require.NoError(t, err)
93-
require.Len(t, config.Servers, 1)
94-
require.Equal(t, "mokapi-service:9092", config.Servers["foo"].Value.Host)
95-
require.Equal(t, "kafka", config.Servers["foo"].Value.Protocol)
93+
require.Equal(t, config.Servers.Len(), 1)
94+
require.Equal(t, "mokapi-service:9092", config.Servers.Lookup("foo").Value.Host)
95+
require.Equal(t, "kafka", config.Servers.Lookup("foo").Value.Protocol)
9696
},
9797
},
9898
}

docs/guides/kafka/overview.md

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,26 @@ Use Mokapi Scripts to simulate edge cases that are difficult to trigger in a rea
9090

9191
To ensure speed and determinism, Mokapi simulates Kafka's application behavior rather than its cluster administration:
9292

93-
- <p><strong>Single Stable Broker:</strong><br/>Focuses on message flow rather than leader election, partition replication, or broker coordination.
94-
- <p><strong>Ephemeral by Design:</strong><br/>Data is kept in-memory to provide lightning-fast feedback loops during development.
93+
- <p><strong>Single Stable Broker:</strong><br/>Focuses on message flow rather than
94+
leader election, partition replication, or broker coordination.
95+
- <p><strong>Ephemeral by Design:</strong><br/>Data is kept in-memory to provide
96+
lightning-fast feedback loops during development.
97+
- <p><strong>Deterministic Broker Address Resolution:</strong><br/>
98+
Mokapi resolves the advertised broker address based on the listener port.
99+
If multiple AsyncAPI servers share the same port, the first matching server
100+
definition is used. AsyncAPI servers are treated as environment-specific
101+
configurations rather than simultaneously addressable brokers.</p>
102+
103+
Kafka clients do not transmit the DNS name used to establish a connection.
104+
When multiple servers share the same listener port, it is therefore not
105+
possible to determine which server was used at runtime. Mokapi follows
106+
Kafka’s networking model and applies a deterministic resolution strategy
107+
instead of guessing.
108+
109+
``` box=tip title=Recommendation
110+
If you define multiple AsyncAPI servers, use different ports for each server
111+
or use Mokapi’s [patching](/docs/configuration/patching.md) mechanism.
112+
```
95113

96114
## Next Steps
97115

providers/asyncapi3/asyncapi3test/config.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package asyncapi3test
22

33
import (
44
"mokapi/providers/asyncapi3"
5+
"mokapi/sortedmap"
56
)
67

78
type ConfigOptions func(c *asyncapi3.Config)
@@ -10,7 +11,7 @@ func NewConfig(opts ...ConfigOptions) *asyncapi3.Config {
1011
c := &asyncapi3.Config{
1112
Version: "2.0.0",
1213
Info: asyncapi3.Info{Name: "test", Version: "1.0"},
13-
Servers: map[string]*asyncapi3.ServerRef{},
14+
Servers: &sortedmap.LinkedHashMap[string, *asyncapi3.ServerRef]{},
1415
DefaultContentType: asyncapi3.DefaultContentType,
1516
}
1617
for _, opt := range opts {
@@ -57,7 +58,7 @@ func WithContact(name, url, mail string) ConfigOptions {
5758
func WithServer(name, protocol, host string, opts ...ServerOptions) ConfigOptions {
5859
return func(c *asyncapi3.Config) {
5960
if c.Servers == nil {
60-
c.Servers = make(map[string]*asyncapi3.ServerRef)
61+
c.Servers = &sortedmap.LinkedHashMap[string, *asyncapi3.ServerRef]{}
6162
}
6263

6364
s := &asyncapi3.Server{
@@ -68,7 +69,7 @@ func WithServer(name, protocol, host string, opts ...ServerOptions) ConfigOption
6869
opt(s)
6970
}
7071

71-
c.Servers[name] = &asyncapi3.ServerRef{Value: s}
72+
c.Servers.Set(name, &asyncapi3.ServerRef{Value: s})
7273
}
7374
}
7475

providers/asyncapi3/bindings_kafka_test.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package asyncapi3_test
22

33
import (
4-
"github.com/stretchr/testify/require"
5-
"gopkg.in/yaml.v3"
64
"mokapi/providers/asyncapi3"
75
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
"gopkg.in/yaml.v3"
89
)
910

1011
func TestKafkaBindingsServer_Yaml(t *testing.T) {
@@ -24,7 +25,7 @@ servers:
2425
`,
2526
test: func(t *testing.T, config *asyncapi3.Config, err error) {
2627
require.NoError(t, err)
27-
require.Equal(t, int64(10), config.Servers["test"].Value.Bindings.Kafka.LogRetentionBytes)
28+
require.Equal(t, int64(10), config.Servers.Lookup("test").Value.Bindings.Kafka.LogRetentionBytes)
2829
},
2930
},
3031
{
@@ -51,7 +52,7 @@ servers:
5152
`,
5253
test: func(t *testing.T, config *asyncapi3.Config, err error) {
5354
require.NoError(t, err)
54-
require.Equal(t, int64(10), config.Servers["test"].Value.Bindings.Kafka.LogRetentionMs)
55+
require.Equal(t, int64(10), config.Servers.Lookup("test").Value.Bindings.Kafka.LogRetentionMs)
5556
},
5657
},
5758
{
@@ -78,7 +79,7 @@ servers:
7879
`,
7980
test: func(t *testing.T, config *asyncapi3.Config, err error) {
8081
require.NoError(t, err)
81-
require.Equal(t, int64(600000), config.Servers["test"].Value.Bindings.Kafka.LogRetentionMs)
82+
require.Equal(t, int64(600000), config.Servers.Lookup("test").Value.Bindings.Kafka.LogRetentionMs)
8283
},
8384
},
8485
{
@@ -105,7 +106,7 @@ servers:
105106
`,
106107
test: func(t *testing.T, config *asyncapi3.Config, err error) {
107108
require.NoError(t, err)
108-
require.Equal(t, int64(36000000), config.Servers["test"].Value.Bindings.Kafka.LogRetentionMs)
109+
require.Equal(t, int64(36000000), config.Servers.Lookup("test").Value.Bindings.Kafka.LogRetentionMs)
109110
},
110111
},
111112
{
@@ -132,7 +133,7 @@ servers:
132133
`,
133134
test: func(t *testing.T, config *asyncapi3.Config, err error) {
134135
require.NoError(t, err)
135-
require.Equal(t, int64(10), config.Servers["test"].Value.Bindings.Kafka.LogRetentionCheckIntervalMs)
136+
require.Equal(t, int64(10), config.Servers.Lookup("test").Value.Bindings.Kafka.LogRetentionCheckIntervalMs)
136137
},
137138
},
138139
{
@@ -159,7 +160,7 @@ servers:
159160
`,
160161
test: func(t *testing.T, config *asyncapi3.Config, err error) {
161162
require.NoError(t, err)
162-
require.Equal(t, int64(10), config.Servers["test"].Value.Bindings.Kafka.LogSegmentDeleteDelayMs)
163+
require.Equal(t, int64(10), config.Servers.Lookup("test").Value.Bindings.Kafka.LogSegmentDeleteDelayMs)
163164
},
164165
},
165166
{
@@ -186,7 +187,7 @@ servers:
186187
`,
187188
test: func(t *testing.T, config *asyncapi3.Config, err error) {
188189
require.NoError(t, err)
189-
require.Equal(t, int64(10), config.Servers["test"].Value.Bindings.Kafka.LogRollMs)
190+
require.Equal(t, int64(10), config.Servers.Lookup("test").Value.Bindings.Kafka.LogRollMs)
190191
},
191192
},
192193
{
@@ -213,7 +214,7 @@ servers:
213214
`,
214215
test: func(t *testing.T, config *asyncapi3.Config, err error) {
215216
require.NoError(t, err)
216-
require.Equal(t, int64(600000), config.Servers["test"].Value.Bindings.Kafka.LogRollMs)
217+
require.Equal(t, int64(600000), config.Servers.Lookup("test").Value.Bindings.Kafka.LogRollMs)
217218
},
218219
},
219220
{
@@ -240,7 +241,7 @@ servers:
240241
`,
241242
test: func(t *testing.T, config *asyncapi3.Config, err error) {
242243
require.NoError(t, err)
243-
require.Equal(t, int64(36000000), config.Servers["test"].Value.Bindings.Kafka.LogRollMs)
244+
require.Equal(t, int64(36000000), config.Servers.Lookup("test").Value.Bindings.Kafka.LogRollMs)
244245
},
245246
},
246247
{
@@ -267,7 +268,7 @@ servers:
267268
`,
268269
test: func(t *testing.T, config *asyncapi3.Config, err error) {
269270
require.NoError(t, err)
270-
require.Equal(t, int64(10), config.Servers["test"].Value.Bindings.Kafka.LogSegmentBytes)
271+
require.Equal(t, int64(10), config.Servers.Lookup("test").Value.Bindings.Kafka.LogSegmentBytes)
271272
},
272273
},
273274
{
@@ -294,7 +295,7 @@ servers:
294295
`,
295296
test: func(t *testing.T, config *asyncapi3.Config, err error) {
296297
require.NoError(t, err)
297-
require.Equal(t, int64(10), config.Servers["test"].Value.Bindings.Kafka.GroupInitialRebalanceDelayMs)
298+
require.Equal(t, int64(10), config.Servers.Lookup("test").Value.Bindings.Kafka.GroupInitialRebalanceDelayMs)
298299
},
299300
},
300301
{
@@ -321,7 +322,7 @@ servers:
321322
`,
322323
test: func(t *testing.T, config *asyncapi3.Config, err error) {
323324
require.NoError(t, err)
324-
require.Equal(t, int64(10), config.Servers["test"].Value.Bindings.Kafka.GroupMinSessionTimeoutMs)
325+
require.Equal(t, int64(10), config.Servers.Lookup("test").Value.Bindings.Kafka.GroupMinSessionTimeoutMs)
325326
},
326327
},
327328
{
@@ -347,7 +348,7 @@ servers:
347348
schemaRegistryUrl: foo.bar
348349
`,
349350
test: func(t *testing.T, config *asyncapi3.Config, err error) {
350-
require.Equal(t, "foo.bar", config.Servers["test"].Value.Bindings.Kafka.SchemaRegistryUrl)
351+
require.Equal(t, "foo.bar", config.Servers.Lookup("test").Value.Bindings.Kafka.SchemaRegistryUrl)
351352
},
352353
},
353354
{
@@ -360,7 +361,7 @@ servers:
360361
schemaRegistryVendor: foo
361362
`,
362363
test: func(t *testing.T, config *asyncapi3.Config, err error) {
363-
require.Equal(t, "foo", config.Servers["test"].Value.Bindings.Kafka.SchemaRegistryVendor)
364+
require.Equal(t, "foo", config.Servers.Lookup("test").Value.Bindings.Kafka.SchemaRegistryVendor)
364365
},
365366
},
366367
}

providers/asyncapi3/config.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package asyncapi3
22

33
import (
44
"mokapi/config/dynamic"
5+
"mokapi/sortedmap"
56

67
"gopkg.in/yaml.v3"
78
)
@@ -17,7 +18,7 @@ type Config struct {
1718
// Default content type to use when encoding/decoding a message's payload.
1819
DefaultContentType string `yaml:"defaultContentType" json:"defaultContentType"`
1920

20-
Servers map[string]*ServerRef `yaml:"servers" json:"servers"`
21+
Servers *sortedmap.LinkedHashMap[string, *ServerRef] `yaml:"servers" json:"servers"`
2122

2223
Channels map[string]*ChannelRef
2324
Operations map[string]*OperationRef `yaml:"operations" json:"operations"`
@@ -47,15 +48,18 @@ type License struct {
4748
}
4849

4950
func (c *Config) Parse(config *dynamic.Config, reader dynamic.Reader) error {
50-
for _, server := range c.Servers {
51-
if len(server.Ref) > 0 {
52-
return dynamic.Resolve(server.Ref, &server.Value, config, reader)
53-
}
54-
if server.Value == nil {
55-
return nil
56-
}
57-
if err := server.parse(config, reader); err != nil {
58-
return err
51+
if c.Servers != nil {
52+
for it := c.Servers.Iter(); it.Next(); {
53+
server := it.Value()
54+
if len(server.Ref) > 0 {
55+
return dynamic.Resolve(server.Ref, &server.Value, config, reader)
56+
}
57+
if server.Value == nil {
58+
return nil
59+
}
60+
if err := server.parse(config, reader); err != nil {
61+
return err
62+
}
5963
}
6064
}
6165

@@ -110,7 +114,8 @@ func (c *Config) HasKafkaServer() bool {
110114
if c == nil {
111115
return false
112116
}
113-
for _, server := range c.Servers {
117+
for it := c.Servers.Iter(); it.Next(); {
118+
server := it.Value()
114119
if server.Value.Protocol == "kafka" {
115120
return true
116121
}

providers/asyncapi3/config_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package asyncapi3_test
22

33
import (
44
"encoding/json"
5-
"github.com/stretchr/testify/require"
6-
"gopkg.in/yaml.v3"
75
"mokapi/config/dynamic"
86
"mokapi/config/dynamic/dynamictest"
97
"mokapi/providers/asyncapi3"
@@ -15,6 +13,9 @@ import (
1513
"os"
1614
"strings"
1715
"testing"
16+
17+
"github.com/stretchr/testify/require"
18+
"gopkg.in/yaml.v3"
1819
)
1920

2021
func TestConfig3_Schema(t *testing.T) {
@@ -71,13 +72,13 @@ func TestStreetlightKafka(t *testing.T) {
7172
require.Equal(t, "application/json", cfg.DefaultContentType)
7273

7374
// Server
74-
require.Len(t, cfg.Servers, 2)
75-
server := cfg.Servers["scram-connections"]
75+
require.Equal(t, cfg.Servers.Len(), 2)
76+
server := cfg.Servers.Lookup("scram-connections")
7677
require.Equal(t, "test.mykafkacluster.org:18092", server.Value.Host)
7778
require.Equal(t, "kafka-secure", server.Value.Protocol)
7879
require.Equal(t, "Test broker secured with scramSha256", server.Value.Description)
7980

80-
server = cfg.Servers["mtls-connections"]
81+
server = cfg.Servers.Lookup("mtls-connections")
8182
require.Equal(t, "test.mykafkacluster.org:28092", server.Value.Host)
8283
require.Equal(t, "kafka-secure", server.Value.Protocol)
8384
require.Equal(t, "Test broker secured with X509", server.Value.Description)

providers/asyncapi3/kafka/store/find_coordinator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ func (s *Store) findCoordinator(rw kafka.ResponseWriter, req *kafka.Request) err
2121
switch r.KeyType {
2222
case findCoordinator.KeyTypeGroup:
2323
host, port := parseHostAndPort(req.Host)
24+
b := s.getBrokerByPort(req.Host)
25+
if b != nil && b.Host != "" {
26+
host = b.Host
27+
}
2428
// Mokapi does no leader management: always return fixed node id
2529
res.NodeId = 0
2630
res.Host = host

0 commit comments

Comments
 (0)