From 09cd0b9e38a695ec47b3db7ed6a3ceaf019f7f78 Mon Sep 17 00:00:00 2001 From: Christoph Hagen Date: Sat, 30 Sep 2023 22:42:33 +0200 Subject: [PATCH] Replace remote observer logic with change callbacks --- README.md | 13 -- .../Extensions/URLSession+Extensions.swift | 27 ---- .../Clairvoyant/Metric/GenericMetric.swift | 3 - Sources/Clairvoyant/Metric/Metric.swift | 110 +++------------- Sources/Clairvoyant/Metric/MetricInfo.swift | 13 +- .../Clairvoyant/Metric/MetricObserver.swift | 5 +- Sources/Clairvoyant/Routes/ServerRoute.swift | 122 ------------------ 7 files changed, 20 insertions(+), 273 deletions(-) delete mode 100644 Sources/Clairvoyant/Extensions/URLSession+Extensions.swift delete mode 100644 Sources/Clairvoyant/Routes/ServerRoute.swift diff --git a/README.md b/README.md index 5eb8e19..d524519 100644 --- a/README.md +++ b/README.md @@ -144,19 +144,6 @@ try await metric.update(newPlayer) Logging values to disk is great, but the data should also be available for inspection and monitoring. Clairvoyant provides a separate package [ClairvoyantVapor](https://github.com/christophhagen/ClairvoyantVapor) to integrate metric access into Vapor servers. -### Receiving from other servers - -To receive a metric pushed from a remote server, configure a metric with `canBeUpdatedByRemote = true`. Any time a new value is received the metric will be updated with this value. -The last value as well as history data can be accessed as with any other metric. - -### Pushing to other servers - -Metrics can be configured to automatically transmit the logged values to a remote server for monitoring or persistence. -To configure this feature, one or more `RemoteMetricObserver`s can be added to each metric. -Whenever a new value is set, then the metric attempts to send all pending updates (including any previously failed values) to the remote observer using the `push` route specified above. - -The remote server must have a metric with the same `id` registered with the observer, and the metric must be configured with `canBeUpdatedByRemote = true`. - ## Initial requirements **Allow publishing of individual metrics** *Implemented* diff --git a/Sources/Clairvoyant/Extensions/URLSession+Extensions.swift b/Sources/Clairvoyant/Extensions/URLSession+Extensions.swift deleted file mode 100644 index 32fc657..0000000 --- a/Sources/Clairvoyant/Extensions/URLSession+Extensions.swift +++ /dev/null @@ -1,27 +0,0 @@ -#if canImport(FoundationNetworking) -import Foundation -import FoundationNetworking - -extension URLSession { - - func data(for request: URLRequest) async throws -> (Data, URLResponse) { - let result: Result<(response: URLResponse, data: Data), Error> = await withCheckedContinuation { continuation in - let task = dataTask(with: request) { data, response, error in - if let error { - continuation.resume(returning: .failure(error)) - } else { - continuation.resume(returning: .success((response!, data!))) - } - } - task.resume() - } - switch result { - case .failure(let error): - throw error - case .success(let result): - return (result.data, result.response) - } - } -} - -#endif diff --git a/Sources/Clairvoyant/Metric/GenericMetric.swift b/Sources/Clairvoyant/Metric/GenericMetric.swift index babf04e..335a3ee 100644 --- a/Sources/Clairvoyant/Metric/GenericMetric.swift +++ b/Sources/Clairvoyant/Metric/GenericMetric.swift @@ -12,9 +12,6 @@ public protocol GenericMetric { /// The information about the metric var info: MetricInfo { get } - /// Indicate that the metric value can be set via remote access - var canBeUpdatedByRemote: Bool { get } - /** Get the last value data of the metric. diff --git a/Sources/Clairvoyant/Metric/Metric.swift b/Sources/Clairvoyant/Metric/Metric.swift index 36c7817..c093cd5 100644 --- a/Sources/Clairvoyant/Metric/Metric.swift +++ b/Sources/Clairvoyant/Metric/Metric.swift @@ -3,6 +3,8 @@ import Foundation import FoundationNetworking #endif +public typealias MetricChangeCallback = (Timestamped) -> Void + /** A metric is a single piece of state that is provided by an application. Changes to the state can be used to update the metric, @@ -32,10 +34,7 @@ public actor Metric where T: MetricValue { private let fileWriter: LogFileWriter - /// Indicate if the metric can be updated by a remote user - public nonisolated var canBeUpdatedByRemote: Bool { - info.canBeUpdatedByRemote - } + private var changeCallbacks: [MetricChangeCallback] = [] /** Indicates that the metric writes values to disk locally. @@ -90,17 +89,15 @@ public actor Metric where T: MetricValue { - Parameter id: The unique id of the metric. - Parameter observer: The metric observer calling this initializer - - Parameter canBeUpdatedByRemote: Indicate if the metric can be set through the Web API - Parameter keepsLocalHistoryData: Indicate if the metric should persist the history to disk - Parameter name: A descriptive name of the metric - Parameter description: A textual description of the metric - Parameter fileSize: The maximum size of files in bytes */ - init(id: String, calledFromObserver observer: MetricObserver, canBeUpdatedByRemote: Bool, keepsLocalHistoryData: Bool, name: String?, description: String?, fileSize: Int) { + init(id: String, calledFromObserver observer: MetricObserver, keepsLocalHistoryData: Bool, name: String?, description: String?, fileSize: Int) { let info = MetricInfo( id: id, dataType: T.valueType, - canBeUpdatedByRemote: canBeUpdatedByRemote, keepsLocalHistoryData: keepsLocalHistoryData, name: name, description: description) @@ -138,11 +135,10 @@ public actor Metric where T: MetricValue { Create a new log metric for a `MetricObserver` - Note: This constructor does not link back to an observer for logging errors, since this would just divert back to the this metric again. */ - init(logId id: String, name: String?, description: String?, canBeUpdatedByRemote: Bool, keepsLocalHistoryData: Bool, logFolder: URL, encoder: BinaryEncoder, decoder: BinaryDecoder, fileSize: Int) { + init(logId id: String, name: String?, description: String?, keepsLocalHistoryData: Bool, logFolder: URL, encoder: BinaryEncoder, decoder: BinaryDecoder, fileSize: Int) { self.info = .init( id: id, dataType: T.valueType, - canBeUpdatedByRemote: canBeUpdatedByRemote, keepsLocalHistoryData: keepsLocalHistoryData, name: name, description: description) @@ -166,20 +162,18 @@ public actor Metric where T: MetricValue { - Parameter dataType: The raw type of the values contained in the metric - Parameter name: A descriptive name of the metric - Parameter description: A textual description of the metric - - Parameter canBeUpdatedByRemote: Indicate if the metric can be set through the Web API - Parameter keepsLocalHistoryData: Indicate if the metric should persist the history to disk - Parameter fileSize: The maximum size of files in bytes - Note: This initializer crashes with a `fatalError`, if `MetricObserver.standard` has not been set. - Note: This initializer crashes with a `fatalError`, if a metric with the same `id` is already registered with the observer. */ - public init(_ id: String, containing dataType: T.Type = T.self, name: String? = nil, description: String? = nil, canBeUpdatedByRemote: Bool = false, keepsLocalHistoryData: Bool = true, fileSize: Int = 10_000_000) { + public init(_ id: String, containing dataType: T.Type = T.self, name: String? = nil, description: String? = nil, keepsLocalHistoryData: Bool = true, fileSize: Int = 10_000_000) { guard let observer = MetricObserver.standard else { fatalError("Initialize the standard observer first by setting `MetricObserver.standard` before creating a metric") } let info = MetricInfo( id: id, dataType: T.valueType, - canBeUpdatedByRemote: canBeUpdatedByRemote, keepsLocalHistoryData: keepsLocalHistoryData, name: name, description: description) @@ -284,9 +278,7 @@ public actor Metric where T: MetricValue { try await fileWriter.write(value) } _lastValue = value - // TODO: Perform this on a separate queue? - // It may greatly increase the time needed to finish updating the value - await notifyRemoteObservers() + changeCallbacks.forEach { $0(value) } return true } @@ -319,83 +311,7 @@ public actor Metric where T: MetricValue { _lastValue = lastValue if let lastValue { _ = try? await fileWriter.write(lastValue: lastValue) - } - // TODO: Perform this on a separate queue? - // It may greatly increase the time needed to finish updating the value - await notifyRemoteObservers() - } - - // MARK: Remote notifications - - /// The remote observers of the metric - private var remoteObservers: Set = [] - - /// The timeout (in seconds) for requests to notify remote observers - private(set) public var remoteObserverNotificationTimeout: TimeInterval = 10.0 - - /** - Set the timeout for requests to update remote observers. - - Parameter timeout: The timeout in seconds. - */ - public func setRemoteNotificationTimeout(_ timeout: TimeInterval) { - remoteObserverNotificationTimeout = timeout - } - - /** - Add a remote to receive notifications when an update to the metric occurs. - - The remote observer must be an instance of a `MetricObserver` exposed through Vapor. - - SeeAlso: Check the documentation about `ClairvoyantVapor` on how to setup `Vapor` with `Clairvoyant` - - Note: Observers are distinguished by their url, and only one observer can be presented for each unique url. - - Returns: `true`, if the observer was added. `false`, if an observer for the same url already exists. - */ - @discardableResult - public func addRemoteObserver(_ remoteObserver: RemoteMetricObserver) -> Bool { - remoteObservers.insert(remoteObserver).inserted - } - - /** - Try to send all pending values to remote observers. - - - Parameter timeout: The time to wait for each request to complete. Uses ``remoteObserverNotificationTimeout`` if unset - - Note: This function is called automatically when the metric value changes. - There is usually no need to call this function manually. - */ - public func notifyRemoteObservers(timeout: TimeInterval? = nil) async { - guard !remoteObservers.isEmpty else { - return - } - await withTaskGroup(of: Void.self) { group in - for observer in remoteObservers { - group.addTask { - await self.push(to: observer, timeout: timeout) - } - } - } - } - - @discardableResult - private func push(to remoteObserver: RemoteMetricObserver, timeout: TimeInterval?) async -> Bool { - let remoteUrl = remoteObserver.remoteUrl - do { - let route = ServerRoute.pushValueToMetric(idHash) - let url = remoteUrl.appendingPathComponent(route.rawValue) - var request = URLRequest(url: url) - request.timeoutInterval = timeout ?? remoteObserverNotificationTimeout - request.setValue(remoteObserver.authenticationToken, forHTTPHeaderField: "token") - let (_, response) = try await URLSession.shared.data(for: request) - guard let response = response as? HTTPURLResponse else { - await log("Invalid response pushing value to \(remoteUrl.path): \(response)") - return false - } - guard response.statusCode == 200 else { - await log("Failed to push value to \(remoteUrl.path): Response \(response.statusCode)") - return false - } - return true - } catch { - await log("Failed to push value to \(remoteUrl.path): \(error)") - return false + changeCallbacks.forEach { $0(lastValue) } } } @@ -413,6 +329,16 @@ public actor Metric where T: MetricValue { _lastValue = nil } } + + // MARK: Change callbacks + + public func onChange(perform callback: @escaping MetricChangeCallback) { + changeCallbacks.append(callback) + } + + public func removeAllChangeListeners() { + changeCallbacks.removeAll() + } } extension Metric: AbstractMetric { diff --git a/Sources/Clairvoyant/Metric/MetricInfo.swift b/Sources/Clairvoyant/Metric/MetricInfo.swift index e3f4e81..8b5e00f 100644 --- a/Sources/Clairvoyant/Metric/MetricInfo.swift +++ b/Sources/Clairvoyant/Metric/MetricInfo.swift @@ -11,14 +11,6 @@ public struct MetricInfo { /// The data type of the values in the metric public let dataType: MetricType - /** - Indicates that this metric allows receiving updates from remotes. - - If this property is `true`, then the metric can be updated externally through the `push` route of a Vapor observer. - - Note: This property is only relevant if the functionality of `ClairvoyantVapor` is used. - */ - public let canBeUpdatedByRemote: Bool - /** Indicates that the metric writes values to disk locally. @@ -39,13 +31,11 @@ public struct MetricInfo { - Parameter dataType: The data type of the values in the metric - Parameter name: A descriptive name of the metric - Parameter description: A textual description of the metric - - Parameter canBeUpdatedByRemote: Indicate if the metric can be set through the Web API - Parameter keepsLocalHistoryData: Indicate if the metric should persist the history to disk */ - public init(id: String, dataType: MetricType, canBeUpdatedByRemote: Bool = false, keepsLocalHistoryData: Bool = true, name: String? = nil, description: String? = nil) { + public init(id: String, dataType: MetricType, keepsLocalHistoryData: Bool = true, name: String? = nil, description: String? = nil) { self.id = id self.dataType = dataType - self.canBeUpdatedByRemote = canBeUpdatedByRemote self.keepsLocalHistoryData = keepsLocalHistoryData self.name = name self.description = description @@ -59,7 +49,6 @@ extension MetricInfo: Codable { case dataType = 2 case name = 3 case description = 4 - case canBeUpdatedByRemote = 5 case keepsLocalHistoryData = 6 } diff --git a/Sources/Clairvoyant/Metric/MetricObserver.swift b/Sources/Clairvoyant/Metric/MetricObserver.swift index 803d3e9..63547b0 100644 --- a/Sources/Clairvoyant/Metric/MetricObserver.swift +++ b/Sources/Clairvoyant/Metric/MetricObserver.swift @@ -73,7 +73,6 @@ public final class MetricObserver { logId: logMetricId, name: logMetricName, description: logMetricDescription, - canBeUpdatedByRemote: false, keepsLocalHistoryData: true, logFolder: logFolder, encoder: encoder, @@ -91,11 +90,10 @@ public final class MetricObserver { - Parameter name: A descriptive name of the metric - Parameter description: A textual description of the metric - Parameter keepsLocalHistoryData: Indicate if the metric should persist the history to disk - - Parameter canBeUpdatedByRemote: Indicate if the metric can be set through the Web API - Returns: The created or existing metric. - Note: If a metric with the same `id` and `type` already exists, then this one is returned. Other properties (`name`, `description`, ...) are then ignored. */ - public func addMetric(id: String, containing type: T.Type = T.self, name: String? = nil, description: String? = nil, canBeUpdatedByRemote: Bool = false, keepsLocalHistoryData: Bool = true) -> Metric where T: MetricValue { + public func addMetric(id: String, containing type: T.Type = T.self, name: String? = nil, description: String? = nil, keepsLocalHistoryData: Bool = true) -> Metric where T: MetricValue { if let existing = observedMetrics[id.hashed()] { guard let same = existing as? Metric else { fatalError("Two metrics with same id '\(id)' but different types where added to the same observer") @@ -105,7 +103,6 @@ public final class MetricObserver { let metric = Metric( id: id, calledFromObserver: self, - canBeUpdatedByRemote: canBeUpdatedByRemote, keepsLocalHistoryData: keepsLocalHistoryData, name: name, description: description, fileSize: maximumFileSizeInBytes) diff --git a/Sources/Clairvoyant/Routes/ServerRoute.swift b/Sources/Clairvoyant/Routes/ServerRoute.swift deleted file mode 100644 index 112c389..0000000 --- a/Sources/Clairvoyant/Routes/ServerRoute.swift +++ /dev/null @@ -1,122 +0,0 @@ -import Foundation - -/** - The routes existing on a Clairvoyant Vapor server. - */ -public enum ServerRoute { - - /// Get the info for a metric - case getMetricInfo(MetricIdHash) - - /// Get a list of all metrics - case getMetricList - - /// Get the last value of a specific metric - case lastValue(MetricIdHash) - - /// Get last values of all metrics - case allLastValues - - /// Get a list of all metrics with their last values - case extendedInfoList - - /// Get past values of a specific metric - case metricHistory(MetricIdHash) - - /// Update the value of a metric - case pushValueToMetric(MetricIdHash) - - /// The full path of the route - public var rawValue: String { - switch self { - case .getMetricInfo(let hash): return Prefix.getMetricInfo.appending(hash: hash) - case .getMetricList: return Prefix.getMetricList.rawValue - case .lastValue(let hash): return Prefix.lastValue.appending(hash: hash) - case .allLastValues: return Prefix.allLastValues.rawValue - case .extendedInfoList: return Prefix.extendedInfoList.rawValue - case .metricHistory(let hash): return Prefix.metricHistory.appending(hash: hash) - case .pushValueToMetric(let hash): return Prefix.pushValueToMetric.appending(hash: hash) - } - } - - /// The HTTP header key used for access tokens - public static var headerAccessToken = "token" - - /// The start of the route, excluding hashes - public var prefix: Prefix { - switch self { - case .getMetricInfo: - return .getMetricInfo - case .getMetricList: - return .getMetricList - case .lastValue: - return .lastValue - case .allLastValues: - return .allLastValues - case .extendedInfoList: - return .extendedInfoList - case .metricHistory: - return .metricHistory - case .pushValueToMetric: - return .pushValueToMetric - } - } - - /// The prefix of a server route - public enum Prefix: String { - case getMetricInfo = "info" - case getMetricList = "list" - case lastValue = "last" - case allLastValues = "last/all" - case extendedInfoList = "list/extended" - case metricHistory = "history" - case pushValueToMetric = "push" - - /** - Create a full server route by adding the hash of a metric. - - Parameter hash: The metric id hash to add. - - Returns: The full route - */ - public func with(hash: MetricIdHash) -> ServerRoute { - switch self { - case .getMetricInfo: return .getMetricInfo(hash) - case .getMetricList: return .getMetricList - case .lastValue: return .lastValue(hash) - case .allLastValues: return .allLastValues - case .extendedInfoList: return .extendedInfoList - case .metricHistory: return .metricHistory(hash) - case .pushValueToMetric: return .pushValueToMetric(hash) - } - } - - /** - Create a full server route by adding the hash of a metric. - - Parameter hash: The metric id hash to add. - - Returns: The full route as a string - */ - public func appending(hash: MetricIdHash) -> String { - return rawValue + "/" + hash - } - } -} - -extension ServerRoute: Equatable { - - public static func == (lhs: ServerRoute, rhs: ServerRoute) -> Bool { - lhs.rawValue == rhs.rawValue - } -} - -extension ServerRoute: Hashable { - - public func hash(into hasher: inout Hasher) { - hasher.combine(rawValue) - } -} - -extension ServerRoute: CustomStringConvertible { - - public var description: String { - rawValue - } -}