Skip to content
This repository has been archived by the owner on Nov 25, 2024. It is now read-only.

Commit

Permalink
Remove unneeded TopicFor
Browse files Browse the repository at this point in the history
  • Loading branch information
S7evinK committed Jul 24, 2021
1 parent a833f57 commit f5a4e4a
Show file tree
Hide file tree
Showing 15 changed files with 22 additions and 22 deletions.
2 changes: 1 addition & 1 deletion appservice/consumers/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewOutputRoomEventConsumer(
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "appservice/roomserver",
Topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent),
Topic: jetstream.OutputRoomEvent,
Consumer: kafkaConsumer,
PartitionStore: appserviceDB,
}
Expand Down
2 changes: 1 addition & 1 deletion clientapi/clientapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func AddPublicRoutes(

syncProducer := &producers.SyncAPIProducer{
Producer: producer,
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
Topic: jetstream.OutputClientData,
}

routing.Setup(
Expand Down
6 changes: 3 additions & 3 deletions eduserver/eduserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func NewInternalAPI(
Cache: eduCache,
UserAPI: userAPI,
Producer: producer,
OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
OutputTypingEventTopic: jetstream.OutputTypingEvent,
OutputSendToDeviceEventTopic: jetstream.OutputSendToDeviceEvent,
OutputReceiptEventTopic: jetstream.OutputReceiptEvent,
ServerName: cfg.Matrix.ServerName,
}
}
10 changes: 5 additions & 5 deletions federationsender/consumers/eduserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,29 @@ func NewOutputEDUConsumer(
typingConsumer: &internal.ContinualConsumer{
Process: process,
ComponentName: "eduserver/typing",
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
Topic: jetstream.OutputTypingEvent,
Consumer: kafkaConsumer,
PartitionStore: store,
},
sendToDeviceConsumer: &internal.ContinualConsumer{
Process: process,
ComponentName: "eduserver/sendtodevice",
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
Topic: jetstream.OutputSendToDeviceEvent,
Consumer: kafkaConsumer,
PartitionStore: store,
},
receiptConsumer: &internal.ContinualConsumer{
Process: process,
ComponentName: "eduserver/receipt",
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
Topic: jetstream.OutputReceiptEvent,
Consumer: kafkaConsumer,
PartitionStore: store,
},
queues: queues,
db: store,
ServerName: cfg.Matrix.ServerName,
TypingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
SendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
TypingTopic: jetstream.OutputTypingEvent,
SendToDeviceTopic: jetstream.OutputSendToDeviceEvent,
}
c.typingConsumer.ProcessMessage = c.onTypingEvent
c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent
Expand Down
2 changes: 1 addition & 1 deletion federationsender/consumers/keychange.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewKeyChangeConsumer(
consumer: &internal.ContinualConsumer{
Process: process,
ComponentName: "federationsender/keychange",
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
Topic: jetstream.OutputKeyChangeEvent,
Consumer: kafkaConsumer,
PartitionStore: store,
},
Expand Down
2 changes: 1 addition & 1 deletion federationsender/consumers/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewOutputRoomEventConsumer(
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "federationsender/roomserver",
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)),
Topic: jetstream.OutputRoomEvent,
Consumer: kafkaConsumer,
PartitionStore: store,
}
Expand Down
2 changes: 1 addition & 1 deletion keyserver/keyserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewInternalAPI(
logrus.WithError(err).Panicf("failed to connect to key server database")
}
keyChangeProducer := &producers.KeyChange{
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
Topic: jetstream.OutputKeyChangeEvent,
Producer: producer,
DB: db,
}
Expand Down
2 changes: 1 addition & 1 deletion roomserver/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewInternalAPI(
}

return internal.NewRoomserverAPI(
cfg, roomserverDB, producer, string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)),
cfg, roomserverDB, producer, jetstream.OutputRoomEvent,
base.Caches, keyRing, perspectiveServerNames,
)
}
4 changes: 2 additions & 2 deletions roomserver/roomserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro
ConnectionString: roomserverDBFileURI,
}
dp := &dummyProducer{
topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent),
topic: jetstream.OutputRoomEvent,
}
cache, err := caching.NewInMemoryLRUCache(false)
if err != nil {
Expand All @@ -181,7 +181,7 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro
logrus.WithError(err).Panicf("failed to connect to room server db")
}
return internal.NewRoomserverAPI(
&cfg.RoomServer, roomserverDB, dp, string(cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent)),
&cfg.RoomServer, roomserverDB, dp, jetstream.OutputRoomEvent,
base.Caches, &test.NopJSONVerifier{}, nil,
), dp
}
Expand Down
2 changes: 1 addition & 1 deletion syncapi/consumers/clientapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewOutputClientDataConsumer(
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/clientapi",
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData)),
Topic: jetstream.OutputClientData,
Consumer: kafkaConsumer,
PartitionStore: store,
}
Expand Down
2 changes: 1 addition & 1 deletion syncapi/consumers/eduserver_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewOutputReceiptEventConsumer(
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/eduserver/receipt",
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
Topic: jetstream.OutputReceiptEvent,
Consumer: kafkaConsumer,
PartitionStore: store,
}
Expand Down
2 changes: 1 addition & 1 deletion syncapi/consumers/eduserver_sendtodevice.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewOutputSendToDeviceEventConsumer(
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/eduserver/sendtodevice",
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent)),
Topic: jetstream.OutputSendToDeviceEvent,
Consumer: kafkaConsumer,
PartitionStore: store,
}
Expand Down
2 changes: 1 addition & 1 deletion syncapi/consumers/eduserver_typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewOutputTypingEventConsumer(
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/eduserver/typing",
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent)),
Topic: jetstream.OutputTypingEvent,
Consumer: kafkaConsumer,
PartitionStore: store,
}
Expand Down
2 changes: 1 addition & 1 deletion syncapi/consumers/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewOutputRoomEventConsumer(
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/roomserver",
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)),
Topic: jetstream.OutputRoomEvent,
Consumer: kafkaConsumer,
PartitionStore: store,
}
Expand Down
2 changes: 1 addition & 1 deletion syncapi/syncapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func AddPublicRoutes(
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)

keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
process, cfg.Matrix.ServerName, string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
process, cfg.Matrix.ServerName, jetstream.OutputKeyChangeEvent,
consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider,
)
if err = keyChangeConsumer.Start(); err != nil {
Expand Down

2 comments on commit f5a4e4a

@neilalexander
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you've removed cfg.Matrix.JetStream.TopicFor() but that now means that the topic_prefix in the config are ignored? (That's what that function is for.)

@S7evinK
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TopicFor is still used in setup/jetstream/nats.go when creating the streams.
But your probably right, didn't think this through. Now we're creating prefixed streams but no prefixed subjects, which will cause issues when actual using multiple instances.
Will revert that.

Please sign in to comment.