Skip to content

Commit

Permalink
refactor MsgDispatcherService, created new device SS processor service
Browse files Browse the repository at this point in the history
  • Loading branch information
dmytro-landiak committed Jan 27, 2025
1 parent 7c74ab7 commit de1c34e
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,14 @@
import org.springframework.util.CollectionUtils;
import org.thingsboard.mqtt.broker.actors.client.service.subscription.SubscriptionService;
import org.thingsboard.mqtt.broker.adaptor.ProtoConverter;
import org.thingsboard.mqtt.broker.common.data.ClientInfo;
import org.thingsboard.mqtt.broker.common.data.ClientSessionInfo;
import org.thingsboard.mqtt.broker.common.data.ClientType;
import org.thingsboard.mqtt.broker.common.data.MqttQoS;
import org.thingsboard.mqtt.broker.common.data.SessionInfo;
import org.thingsboard.mqtt.broker.common.data.subscription.SubscriptionOptions;
import org.thingsboard.mqtt.broker.common.data.util.StringUtils;
import org.thingsboard.mqtt.broker.common.stats.MessagesStats;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos.PublishMsgProto;
import org.thingsboard.mqtt.broker.queue.TbQueueCallback;
import org.thingsboard.mqtt.broker.queue.cluster.ServiceInfoProvider;
import org.thingsboard.mqtt.broker.queue.common.DefaultTbQueueMsgHeaders;
import org.thingsboard.mqtt.broker.queue.common.TbProtoQueueMsg;
import org.thingsboard.mqtt.broker.service.analysis.ClientLogger;
Expand All @@ -46,18 +43,15 @@
import org.thingsboard.mqtt.broker.service.processing.data.MsgSubscriptions;
import org.thingsboard.mqtt.broker.service.processing.data.PersistentMsgSubscriptions;
import org.thingsboard.mqtt.broker.service.processing.downlink.DownLinkProxy;
import org.thingsboard.mqtt.broker.service.processing.shared.DeviceSharedSubscriptionProcessor;
import org.thingsboard.mqtt.broker.service.stats.StatsManager;
import org.thingsboard.mqtt.broker.service.stats.timer.PublishMsgProcessingTimerStats;
import org.thingsboard.mqtt.broker.service.subscription.ClientSubscription;
import org.thingsboard.mqtt.broker.service.subscription.Subscription;
import org.thingsboard.mqtt.broker.service.subscription.ValueWithTopicFilter;
import org.thingsboard.mqtt.broker.service.subscription.shared.SharedSubscription;
import org.thingsboard.mqtt.broker.service.subscription.shared.SharedSubscriptionCacheService;
import org.thingsboard.mqtt.broker.service.subscription.shared.SharedSubscriptionProcessingStrategy;
import org.thingsboard.mqtt.broker.service.subscription.shared.SharedSubscriptionProcessingStrategyFactory;
import org.thingsboard.mqtt.broker.service.subscription.shared.SharedSubscriptions;
import org.thingsboard.mqtt.broker.service.subscription.shared.TopicSharedSubscription;
import org.thingsboard.mqtt.broker.util.ClientSessionInfoFactory;
import org.thingsboard.mqtt.broker.util.MqttPropertiesUtil;

import java.util.ArrayList;
Expand All @@ -67,7 +61,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.thingsboard.mqtt.broker.common.data.BrokerConstants.DROPPED_MSGS;
import static org.thingsboard.mqtt.broker.common.data.BrokerConstants.INCOMING_MSGS;
Expand All @@ -84,10 +77,9 @@ public class MsgDispatcherServiceImpl implements MsgDispatcherService {
private final DownLinkProxy downLinkProxy;
private final ClientLogger clientLogger;
private final PublishMsgQueuePublisher publishMsgQueuePublisher;
private final SharedSubscriptionProcessingStrategyFactory sharedSubscriptionProcessingStrategyFactory;
private final DeviceSharedSubscriptionProcessor deviceSharedSubscriptionProcessor;
private final SharedSubscriptionCacheService sharedSubscriptionCacheService;
private final TbMessageStatsReportClient tbMessageStatsReportClient;
private final ServiceInfoProvider serviceInfoProvider;
private final RateLimitService rateLimitService;

private MessagesStats producerStats;
Expand Down Expand Up @@ -260,7 +252,7 @@ MsgSubscriptions getAllSubscriptionsForPubMsg(PublishMsgProto publishMsgProto, S
return new MsgSubscriptions(
collectCommonSubscriptions(commonClientSubscriptions, senderClientId),
sharedSubscriptions == null ? null : sharedSubscriptions.getApplicationSubscriptions(),
getTargetDeviceSharedSubscriptions(sharedSubscriptions, publishMsgProto.getQos())
sharedSubscriptions == null ? null : deviceSharedSubscriptionProcessor.getTargetSubscriptions(sharedSubscriptions.getDeviceSubscriptions(), publishMsgProto.getQos())
);
} else {
return new MsgSubscriptions(
Expand Down Expand Up @@ -301,23 +293,6 @@ private Set<TopicSharedSubscription> addSubscription(ValueWithTopicFilter<Client
return topicSharedSubscriptions;
}

private List<Subscription> getTargetDeviceSharedSubscriptions(SharedSubscriptions sharedSubscriptions, int qos) {
if (sharedSubscriptions == null || CollectionUtils.isEmpty(sharedSubscriptions.getDeviceSubscriptions())) {
return null;
}
List<SharedSubscription> sharedSubscriptionList = toSharedSubscriptionList(sharedSubscriptions.getDeviceSubscriptions());
return collectOneSubscriptionFromEveryDeviceSharedSubscription(sharedSubscriptionList, qos);
}

List<SharedSubscription> toSharedSubscriptionList(Set<Subscription> sharedSubscriptions) {
return sharedSubscriptions.stream()
.collect(Collectors.groupingBy(subscription ->
new TopicSharedSubscription(subscription.getTopicFilter(), subscription.getShareName(), subscription.getQos())))
.entrySet().stream()
.map(entry -> new SharedSubscription(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
}

private List<Subscription> collectCommonSubscriptions(
List<ValueWithTopicFilter<ClientSubscription>> clientSubscriptionWithTopicFilterList, String senderClientId) {

Expand All @@ -333,60 +308,6 @@ private List<Subscription> collectCommonSubscriptions(
return msgSubscriptions;
}

private List<Subscription> collectOneSubscriptionFromEveryDeviceSharedSubscription(List<SharedSubscription> sharedSubscriptions, int qos) {
List<Subscription> result = new ArrayList<>(sharedSubscriptions.size());
for (SharedSubscription sharedSubscription : sharedSubscriptions) {
result.add(getSubscription(sharedSubscription, qos));
}
return result;
}

private Subscription getSubscription(SharedSubscription sharedSubscription, int qos) {
Subscription anyActive = findAnyConnectedSubscription(sharedSubscription.getSubscriptions());
if (anyActive == null) {
log.info("[{}] No active subscription found for shared subscription - all are persisted and disconnected", sharedSubscription.getTopicSharedSubscription());
return createDummySubscription(sharedSubscription, qos);
} else {
SharedSubscriptionProcessingStrategy strategy = sharedSubscriptionProcessingStrategyFactory.newInstance();
return strategy.analyze(sharedSubscription);
}
}

Subscription findAnyConnectedSubscription(List<Subscription> subscriptions) {
if (CollectionUtils.isEmpty(subscriptions)) {
return null;
}
return subscriptions
.stream()
.filter(subscription -> subscription.getClientSessionInfo().isConnected())
.findAny()
.orElse(null);
}

private Subscription createDummySubscription(SharedSubscription sharedSubscription, int qos) {
return new Subscription(
sharedSubscription.getTopicSharedSubscription().getTopicFilter(),
qos,
createDummyClientSession(sharedSubscription),
sharedSubscription.getTopicSharedSubscription().getShareName(),
SubscriptionOptions.newInstance(),
-1
);
}

private ClientSessionInfo createDummyClientSession(SharedSubscription sharedSubscription) {
ClientInfo clientInfo = ClientSessionInfoFactory.getClientInfo(sharedSubscription.getTopicSharedSubscription().getKey());
return ClientSessionInfo.builder()
.connected(false)
.serviceId(serviceInfoProvider.getServiceId())
.clientId(clientInfo.getClientId())
.type(clientInfo.getType())
.clientIpAdr(clientInfo.getClientIpAdr())
.cleanStart(false)
.sessionExpiryInterval(1000)
.build();
}

List<Subscription> collectSubscriptions(
List<ValueWithTopicFilter<ClientSubscription>> clientSubscriptionWithTopicFilterList, String senderClientId) {
Map<String, Subscription> map = Maps.newHashMapWithExpectedSize(clientSubscriptionWithTopicFilterList.size());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.mqtt.broker.service.processing.shared;

import org.thingsboard.mqtt.broker.service.subscription.Subscription;

import java.util.List;
import java.util.Set;

public interface DeviceSharedSubscriptionProcessor {

/**
* Processes device shared subscriptions to identify the target device subscriptions for shared topics.
*
* This method takes a set of device subscriptions and identifies one subscription
* from each shared subscription group.
*
* @param deviceSubscriptions A set of device subscriptions for shared topics. May be empty or null.
* @param qos The quality of service level of published message to apply for the identified subscriptions.
* @return A list of target subscriptions, one per shared subscription group. Returns null if input subscriptions are empty or null.
*/
List<Subscription> getTargetSubscriptions(Set<Subscription> deviceSubscriptions, int qos);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.mqtt.broker.service.processing.shared;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.thingsboard.mqtt.broker.common.data.ClientInfo;
import org.thingsboard.mqtt.broker.common.data.ClientSessionInfo;
import org.thingsboard.mqtt.broker.common.data.subscription.SubscriptionOptions;
import org.thingsboard.mqtt.broker.queue.cluster.ServiceInfoProvider;
import org.thingsboard.mqtt.broker.service.subscription.Subscription;
import org.thingsboard.mqtt.broker.service.subscription.shared.SharedSubscription;
import org.thingsboard.mqtt.broker.service.subscription.shared.SharedSubscriptionProcessingStrategy;
import org.thingsboard.mqtt.broker.service.subscription.shared.SharedSubscriptionProcessingStrategyFactory;
import org.thingsboard.mqtt.broker.service.subscription.shared.TopicSharedSubscription;
import org.thingsboard.mqtt.broker.util.ClientSessionInfoFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Service
@Slf4j
@RequiredArgsConstructor
public class DeviceSharedSubscriptionProcessorImpl implements DeviceSharedSubscriptionProcessor {

private final ServiceInfoProvider serviceInfoProvider;
private final SharedSubscriptionProcessingStrategyFactory sharedSubscriptionProcessingStrategyFactory;

@Override
public List<Subscription> getTargetSubscriptions(Set<Subscription> deviceSubscriptions, int qos) {
if (CollectionUtils.isEmpty(deviceSubscriptions)) {
return null;
}
List<SharedSubscription> sharedSubscriptionList = toSharedSubscriptionList(deviceSubscriptions);
return collectOneSubscriptionFromEveryDeviceSharedSubscription(sharedSubscriptionList, qos);
}

List<SharedSubscription> toSharedSubscriptionList(Set<Subscription> sharedSubscriptions) {
return sharedSubscriptions.stream()
.collect(Collectors.groupingBy(subscription ->
new TopicSharedSubscription(subscription.getTopicFilter(), subscription.getShareName(), subscription.getQos())))
.entrySet().stream()
.map(entry -> new SharedSubscription(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
}

private List<Subscription> collectOneSubscriptionFromEveryDeviceSharedSubscription(List<SharedSubscription> sharedSubscriptions, int qos) {
List<Subscription> result = new ArrayList<>(sharedSubscriptions.size());
for (SharedSubscription sharedSubscription : sharedSubscriptions) {
result.add(getSubscription(sharedSubscription, qos));
}
return result;
}

private Subscription getSubscription(SharedSubscription sharedSubscription, int qos) {
Subscription anyActive = findAnyConnectedSubscription(sharedSubscription.getSubscriptions());
if (anyActive == null) {
log.info("[{}] No active subscription found for shared subscription - all are persisted and disconnected", sharedSubscription.getTopicSharedSubscription());
return createDummySubscription(sharedSubscription, qos);
} else {
SharedSubscriptionProcessingStrategy strategy = sharedSubscriptionProcessingStrategyFactory.newInstance();
return strategy.analyze(sharedSubscription);
}
}

Subscription findAnyConnectedSubscription(List<Subscription> subscriptions) {
if (CollectionUtils.isEmpty(subscriptions)) {
return null;
}
return subscriptions
.stream()
.filter(subscription -> subscription.getClientSessionInfo().isConnected())
.findAny()
.orElse(null);
}

private Subscription createDummySubscription(SharedSubscription sharedSubscription, int qos) {
return new Subscription(
sharedSubscription.getTopicSharedSubscription().getTopicFilter(),
qos,
createDummyClientSession(sharedSubscription),
sharedSubscription.getTopicSharedSubscription().getShareName(),
SubscriptionOptions.newInstance(),
-1
);
}

private ClientSessionInfo createDummyClientSession(SharedSubscription sharedSubscription) {
ClientInfo clientInfo = ClientSessionInfoFactory.getClientInfo(sharedSubscription.getTopicSharedSubscription().getKey());
return ClientSessionInfo.builder()
.connected(false)
.serviceId(serviceInfoProvider.getServiceId())
.clientId(clientInfo.getClientId())
.type(clientInfo.getType())
.clientIpAdr(clientInfo.getClientIpAdr())
.cleanStart(false)
.sessionExpiryInterval(1000)
.build();
}

}
Loading

0 comments on commit de1c34e

Please sign in to comment.