Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClientNode implementation round 3 #1711

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/general/src/util/Cancelable.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* @license
* Copyright 2022-2024 Matter.js Authors
* Copyright 2022-2025 Matter.js Authors
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down
36 changes: 18 additions & 18 deletions packages/general/src/util/DataReadQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ import { createPromise } from "./Promises.js";
import { EndOfStreamError, NoResponseTimeoutError, Stream } from "./Stream.js";

export class DataReadQueue<T> implements Stream<T> {
private readonly queue = new Array<T>();
private pendingRead?: { resolver: (data: T) => void; rejecter: (reason: any) => void; timeoutTimer?: Timer };
private closed = false;
readonly #queue = new Array<T>();
#pendingRead?: { resolver: (data: T) => void; rejecter: (reason: any) => void; timeoutTimer?: Timer };
#closed = false;

async read(timeoutMs = 60_000): Promise<T> {
const { promise, resolver, rejecter } = createPromise<T>();
if (this.closed) throw new EndOfStreamError();
const data = this.queue.shift();
if (this.#closed) throw new EndOfStreamError();
const data = this.#queue.shift();
if (data !== undefined) {
return data;
}
if (this.pendingRead !== undefined) throw new MatterFlowError("Only one pending read is supported");
this.pendingRead = {
if (this.#pendingRead !== undefined) throw new MatterFlowError("Only one pending read is supported");
this.#pendingRead = {
resolver,
rejecter,
timeoutTimer: Time.getTimer("Queue timeout", timeoutMs, () =>
Expand All @@ -35,21 +35,21 @@ export class DataReadQueue<T> implements Stream<T> {
}

async write(data: T) {
if (this.closed) throw new EndOfStreamError();
if (this.pendingRead !== undefined) {
this.pendingRead.timeoutTimer?.stop();
this.pendingRead.resolver(data);
this.pendingRead = undefined;
if (this.#closed) throw new EndOfStreamError();
if (this.#pendingRead !== undefined) {
this.#pendingRead.timeoutTimer?.stop();
this.#pendingRead.resolver(data);
this.#pendingRead = undefined;
return;
}
this.queue.push(data);
this.#queue.push(data);
}

close() {
if (this.closed) return;
this.closed = true;
if (this.pendingRead === undefined) return;
this.pendingRead.timeoutTimer?.stop();
this.pendingRead.rejecter(new EndOfStreamError());
if (this.#closed) return;
this.#closed = true;
if (this.#pendingRead === undefined) return;
this.#pendingRead.timeoutTimer?.stop();
this.#pendingRead.rejecter(new EndOfStreamError());
}
}
4 changes: 2 additions & 2 deletions packages/general/src/util/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
* Copyright 2022-2025 Matter.js Authors
* SPDX-License-Identifier: Apache-2.0
*/
import { MatterError } from "../MatterError.js";
import { MatterError, TimeoutError } from "../MatterError.js";

export class EndOfStreamError extends MatterError {
constructor(message = "Unexpected end of stream") {
super(message);
}
}

export class NoResponseTimeoutError extends MatterError {}
export class NoResponseTimeoutError extends TimeoutError {}

export interface Stream<T> {
read(): Promise<T>;
Expand Down
15 changes: 11 additions & 4 deletions packages/matter.js/src/MatterController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import {
FabricBuilder,
FabricManager,
InstanceBroadcaster,
InteractionClientProvider,
NodeDiscoveryType,
OperationalPeer,
PeerAddress,
Expand Down Expand Up @@ -225,6 +226,7 @@ export class MatterController {
private readonly channelManager = new ChannelManager(CONTROLLER_CONNECTIONS_PER_FABRIC_AND_NODE);
private readonly exchangeManager: ExchangeManager;
private readonly peers: PeerSet;
private readonly clients: InteractionClientProvider;
private readonly commissioner: ControllerCommissioner;
#construction: Construction<MatterController>;

Expand Down Expand Up @@ -283,13 +285,15 @@ export class MatterController {
this.sessionClosedCallback?.(session.peerNodeId);
});

const subscriptionClient = new SubscriptionClient();

this.exchangeManager = new ExchangeManager({
sessionManager: this.sessionManager,
channelManager: this.channelManager,
transportInterfaces: this.netInterfaces,
});
this.exchangeManager.addProtocolHandler(new SecureChannelProtocol(this.sessionManager, fabricManager));
this.exchangeManager.addProtocolHandler(new SubscriptionClient());
this.exchangeManager.addProtocolHandler(subscriptionClient);

// Adapts the historical storage format for MatterController to OperationalPeer objects
this.nodesStore = new CommissionedNodeStore(controllerStore, fabric);
Expand All @@ -298,13 +302,16 @@ export class MatterController {
sessions: this.sessionManager,
channels: this.channelManager,
exchanges: this.exchangeManager,
subscriptionClient,
scanners: this.scanners,
netInterfaces: this.netInterfaces,
store: this.nodesStore,
});

this.clients = new InteractionClientProvider(this.peers);

this.commissioner = new ControllerCommissioner({
peers: this.peers,
clients: this.clients,
scanners: this.scanners,
netInterfaces: this.netInterfaces,
exchanges: this.exchangeManager,
Expand Down Expand Up @@ -494,11 +501,11 @@ export class MatterController {
* Returns a InteractionClient on success.
*/
async connect(peerNodeId: NodeId, discoveryOptions: DiscoveryOptions, allowUnknownPeer?: boolean) {
return this.peers.connect(this.fabric.addressOf(peerNodeId), discoveryOptions, allowUnknownPeer);
return this.clients.connect(this.fabric.addressOf(peerNodeId), discoveryOptions, allowUnknownPeer);
}

createInteractionClient(peerNodeId: NodeId, discoveryOptions: DiscoveryOptions) {
return this.peers.initializeInteractionClient(this.fabric.addressOf(peerNodeId), discoveryOptions);
return this.clients.getInteractionClient(this.fabric.addressOf(peerNodeId), discoveryOptions);
}

async getNextAvailableSessionId() {
Expand Down
60 changes: 7 additions & 53 deletions packages/matter.js/src/device/DeviceInformation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,13 @@ import {
ThreadNetworkDiagnostics,
} from "#clusters";
import { Logger, SupportedStorageTypes } from "@matter/general";
import { InteractionClient, SupportedAttributeClient } from "@matter/protocol";
import { InteractionClient, PhysicalDeviceProperties, SupportedAttributeClient } from "@matter/protocol";
import { EndpointNumber, GlobalAttributes, NodeId, TypeFromPartialBitSchema, TypeFromSchema } from "@matter/types";
import { Endpoint } from "./Endpoint.js";

const logger = Logger.get("DeviceInformation");

export type DeviceMetaInformation = {
threadConnected: boolean;
wifiConnected: boolean;
ethernetConnected: boolean;
rootEndpointServerList: number[];
isBatteryPowered: boolean;
isIntermittentlyConnected: boolean;
isThreadSleepyEndDevice: boolean;
export type DeviceMetaInformation = PhysicalDeviceProperties & {
dataRevision: number;
};

Expand All @@ -39,13 +32,6 @@ export type DeviceInformationData = {

export const DEVICE_DATA_REVISION = 1;

const DEFAULT_SUBSCRIPTION_FLOOR_DEFAULT_S = 1;
const DEFAULT_SUBSCRIPTION_FLOOR_ICD_S = 0;
const DEFAULT_SUBSCRIPTION_CEILING_WIFI_S = 60;
const DEFAULT_SUBSCRIPTION_CEILING_THREAD_S = 60;
const DEFAULT_SUBSCRIPTION_CEILING_THREAD_SLEEPY_S = 180;
const DEFAULT_SUBSCRIPTION_CEILING_BATTERY_POWERED_S = 600;

const GlobalAttributeKeys = Object.keys(GlobalAttributes({}));

export class DeviceInformation {
Expand Down Expand Up @@ -315,43 +301,11 @@ export class DeviceInformation {
subscribeMinIntervalFloorSeconds?: number;
subscribeMaxIntervalCeilingSeconds?: number;
}) {
let {
subscribeMinIntervalFloorSeconds: minIntervalFloorSeconds,
subscribeMaxIntervalCeilingSeconds: maxIntervalCeilingSeconds,
} = options;

const { isBatteryPowered, isIntermittentlyConnected, threadConnected, isThreadSleepyEndDevice } =
this.#deviceMeta ?? {};

if (isIntermittentlyConnected) {
if (minIntervalFloorSeconds !== undefined && minIntervalFloorSeconds !== DEFAULT_SUBSCRIPTION_FLOOR_ICD_S) {
logger.info(
`Node ${this.nodeId}: Overwriting minIntervalFloorSeconds for intermittently connected device to 0`,
);
minIntervalFloorSeconds = DEFAULT_SUBSCRIPTION_FLOOR_ICD_S;
}
}

const defaultCeiling = isBatteryPowered
? DEFAULT_SUBSCRIPTION_CEILING_BATTERY_POWERED_S
: isThreadSleepyEndDevice
? DEFAULT_SUBSCRIPTION_CEILING_THREAD_SLEEPY_S
: threadConnected
? DEFAULT_SUBSCRIPTION_CEILING_THREAD_S
: DEFAULT_SUBSCRIPTION_CEILING_WIFI_S;
if (maxIntervalCeilingSeconds === undefined) {
maxIntervalCeilingSeconds = defaultCeiling;
}
if (maxIntervalCeilingSeconds < defaultCeiling) {
logger.debug(
`Node ${this.nodeId}: maxIntervalCeilingSeconds should idealy be set to ${defaultCeiling}s instead of ${maxIntervalCeilingSeconds}s because of device type`,
);
}

return {
minIntervalFloorSeconds: minIntervalFloorSeconds ?? DEFAULT_SUBSCRIPTION_FLOOR_DEFAULT_S,
maxIntervalCeilingSeconds,
};
return PhysicalDeviceProperties.determineSubscriptionParameters({
properties: this.#deviceMeta,
description: `Node ${this.nodeId}`,
...options,
});
}

toStorageData(): DeviceInformationData {
Expand Down
6 changes: 6 additions & 0 deletions packages/node/src/behavior/context/ActionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type { EndpointType } from "#endpoint/type/EndpointType.js";
import type { AccessLevel } from "#model";
import type { Message, SecureSession } from "#protocol";
import { MessageExchange } from "#protocol";
import { Priority } from "#types";
import type { ValueSupervisor } from "../supervision/ValueSupervisor.js";
import { NodeActivity } from "./NodeActivity.js";
import type { OfflineContext } from "./server/OfflineContext.js";
Expand Down Expand Up @@ -57,6 +58,11 @@ export interface ActionContext extends ValueSupervisor.Session {
*/
activity?: NodeActivity.Activity;

/**
* The priority of actions in this context.
*/
priority?: Priority;

/**
* Obtain an agent for interacting with an endpoint in this context.
*/
Expand Down
30 changes: 24 additions & 6 deletions packages/node/src/behavior/state/managed/values/StructManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ export namespace StructManager {

function configureProperty(supervisor: RootSupervisor, schema: ValueModel) {
const name = camelize(schema.name);
const id = schema.id;

const { access, manage, validate } = supervisor.get(schema);

Expand All @@ -189,7 +190,15 @@ function configureProperty(supervisor: RootSupervisor, schema: ValueModel) {
set(this: Struct, value: Val) {
access.authorizeWrite(this[Internal.session], this[Internal.reference].location);

const oldValue = this[Internal.reference].value[name];
// We allow attribute/field name or id as key. If name is present id is ignored
let storedKey =
name in this[Internal.reference].value
? name
: id !== undefined && id in this[Internal.reference]
? id
: name;

const oldValue = this[Internal.reference].value[storedKey];

const self = this;

Expand All @@ -198,12 +207,13 @@ function configureProperty(supervisor: RootSupervisor, schema: ValueModel) {

// Identify the target. Usually just "struct" except when struct supports Val.Dynamic
let target;
if ((struct as Val.Dynamic)[Val.properties]) {
if (Val.properties in struct) {
const properties = (struct as Val.Dynamic)[Val.properties](
this[Internal.reference].rootOwner,
this[Internal.session],
);
if (name in properties) {
storedKey = name;
target = properties;
} else {
target = struct;
Expand All @@ -218,7 +228,7 @@ function configureProperty(supervisor: RootSupervisor, schema: ValueModel) {
}

// Modify the value
if (fabricScopedList && Array.isArray(value) && Array.isArray(target[name])) {
if (fabricScopedList && Array.isArray(value) && Array.isArray(target[storedKey])) {
// In the case of fabric-scoped write to established list we use the managed proxy to perform update
// as it will sort through values and only modify those with correct fabricIndex
const proxy = self[name] as Val.List;
Expand All @@ -228,7 +238,7 @@ function configureProperty(supervisor: RootSupervisor, schema: ValueModel) {
proxy.length = value.length;
} else {
// Direct assignment
target[name] = value;
target[storedKey] = value;
}

if (!this[Internal.session].acceptInvalid && validate) {
Expand All @@ -248,7 +258,7 @@ function configureProperty(supervisor: RootSupervisor, schema: ValueModel) {
} catch (e) {
// Undo our change on error. Rollback will take care of this when transactional but this
// handles the cases of 1.) no transaction, and 2.) error is caught within transaction
target[name] = oldValue;
target[storedKey] = oldValue;

throw e;
}
Expand All @@ -275,7 +285,12 @@ function configureProperty(supervisor: RootSupervisor, schema: ValueModel) {
}
}

return struct[name];
const value = struct[name];
if (value !== undefined || id === undefined) {
return value;
}

return struct[id];
}
};
} else {
Expand Down Expand Up @@ -320,6 +335,9 @@ function configureProperty(supervisor: RootSupervisor, schema: ValueModel) {
}
} else {
value = struct[name];
if (value === undefined && id !== undefined) {
value = struct[id];
}
}

// Note that we only mask values that are unreadable. This is appropriate when the parent object is
Expand Down
Loading
Loading