Skip to content

Commit

Permalink
Replace remote observer logic with change callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
christophhagen committed Sep 30, 2023
1 parent 03a4c10 commit 09cd0b9
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 273 deletions.
13 changes: 0 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,6 @@ try await metric.update(newPlayer)
Logging values to disk is great, but the data should also be available for inspection and monitoring.
Clairvoyant provides a separate package [ClairvoyantVapor](https://github.com/christophhagen/ClairvoyantVapor) to integrate metric access into Vapor servers.

### Receiving from other servers

To receive a metric pushed from a remote server, configure a metric with `canBeUpdatedByRemote = true`. Any time a new value is received the metric will be updated with this value.
The last value as well as history data can be accessed as with any other metric.

### Pushing to other servers

Metrics can be configured to automatically transmit the logged values to a remote server for monitoring or persistence.
To configure this feature, one or more `RemoteMetricObserver`s can be added to each metric.
Whenever a new value is set, then the metric attempts to send all pending updates (including any previously failed values) to the remote observer using the `push` route specified above.

The remote server must have a metric with the same `id` registered with the observer, and the metric must be configured with `canBeUpdatedByRemote = true`.

## Initial requirements

**Allow publishing of individual metrics** *Implemented*
Expand Down
27 changes: 0 additions & 27 deletions Sources/Clairvoyant/Extensions/URLSession+Extensions.swift

This file was deleted.

3 changes: 0 additions & 3 deletions Sources/Clairvoyant/Metric/GenericMetric.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ public protocol GenericMetric {
/// The information about the metric
var info: MetricInfo { get }

/// Indicate that the metric value can be set via remote access
var canBeUpdatedByRemote: Bool { get }

/**
Get the last value data of the metric.

Expand Down
110 changes: 18 additions & 92 deletions Sources/Clairvoyant/Metric/Metric.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import Foundation
import FoundationNetworking
#endif

public typealias MetricChangeCallback<T> = (Timestamped<T>) -> Void

/**
A metric is a single piece of state that is provided by an application.
Changes to the state can be used to update the metric,
Expand Down Expand Up @@ -32,10 +34,7 @@ public actor Metric<T> where T: MetricValue {

private let fileWriter: LogFileWriter<T>

/// Indicate if the metric can be updated by a remote user
public nonisolated var canBeUpdatedByRemote: Bool {
info.canBeUpdatedByRemote
}
private var changeCallbacks: [MetricChangeCallback<T>] = []

/**
Indicates that the metric writes values to disk locally.
Expand Down Expand Up @@ -90,17 +89,15 @@ public actor Metric<T> where T: MetricValue {

- Parameter id: The unique id of the metric.
- Parameter observer: The metric observer calling this initializer
- Parameter canBeUpdatedByRemote: Indicate if the metric can be set through the Web API
- Parameter keepsLocalHistoryData: Indicate if the metric should persist the history to disk
- Parameter name: A descriptive name of the metric
- Parameter description: A textual description of the metric
- Parameter fileSize: The maximum size of files in bytes
*/
init(id: String, calledFromObserver observer: MetricObserver, canBeUpdatedByRemote: Bool, keepsLocalHistoryData: Bool, name: String?, description: String?, fileSize: Int) {
init(id: String, calledFromObserver observer: MetricObserver, keepsLocalHistoryData: Bool, name: String?, description: String?, fileSize: Int) {
let info = MetricInfo(
id: id,
dataType: T.valueType,
canBeUpdatedByRemote: canBeUpdatedByRemote,
keepsLocalHistoryData: keepsLocalHistoryData,
name: name,
description: description)
Expand Down Expand Up @@ -138,11 +135,10 @@ public actor Metric<T> where T: MetricValue {
Create a new log metric for a `MetricObserver`
- Note: This constructor does not link back to an observer for logging errors, since this would just divert back to the this metric again.
*/
init(logId id: String, name: String?, description: String?, canBeUpdatedByRemote: Bool, keepsLocalHistoryData: Bool, logFolder: URL, encoder: BinaryEncoder, decoder: BinaryDecoder, fileSize: Int) {
init(logId id: String, name: String?, description: String?, keepsLocalHistoryData: Bool, logFolder: URL, encoder: BinaryEncoder, decoder: BinaryDecoder, fileSize: Int) {
self.info = .init(
id: id,
dataType: T.valueType,
canBeUpdatedByRemote: canBeUpdatedByRemote,
keepsLocalHistoryData: keepsLocalHistoryData,
name: name,
description: description)
Expand All @@ -166,20 +162,18 @@ public actor Metric<T> where T: MetricValue {
- Parameter dataType: The raw type of the values contained in the metric
- Parameter name: A descriptive name of the metric
- Parameter description: A textual description of the metric
- Parameter canBeUpdatedByRemote: Indicate if the metric can be set through the Web API
- Parameter keepsLocalHistoryData: Indicate if the metric should persist the history to disk
- Parameter fileSize: The maximum size of files in bytes
- Note: This initializer crashes with a `fatalError`, if `MetricObserver.standard` has not been set.
- Note: This initializer crashes with a `fatalError`, if a metric with the same `id` is already registered with the observer.
*/
public init(_ id: String, containing dataType: T.Type = T.self, name: String? = nil, description: String? = nil, canBeUpdatedByRemote: Bool = false, keepsLocalHistoryData: Bool = true, fileSize: Int = 10_000_000) {
public init(_ id: String, containing dataType: T.Type = T.self, name: String? = nil, description: String? = nil, keepsLocalHistoryData: Bool = true, fileSize: Int = 10_000_000) {
guard let observer = MetricObserver.standard else {
fatalError("Initialize the standard observer first by setting `MetricObserver.standard` before creating a metric")
}
let info = MetricInfo(
id: id,
dataType: T.valueType,
canBeUpdatedByRemote: canBeUpdatedByRemote,
keepsLocalHistoryData: keepsLocalHistoryData,
name: name,
description: description)
Expand Down Expand Up @@ -284,9 +278,7 @@ public actor Metric<T> where T: MetricValue {
try await fileWriter.write(value)
}
_lastValue = value
// TODO: Perform this on a separate queue?
// It may greatly increase the time needed to finish updating the value
await notifyRemoteObservers()
changeCallbacks.forEach { $0(value) }
return true
}

Expand Down Expand Up @@ -319,83 +311,7 @@ public actor Metric<T> where T: MetricValue {
_lastValue = lastValue
if let lastValue {
_ = try? await fileWriter.write(lastValue: lastValue)
}
// TODO: Perform this on a separate queue?
// It may greatly increase the time needed to finish updating the value
await notifyRemoteObservers()
}

// MARK: Remote notifications

/// The remote observers of the metric
private var remoteObservers: Set<RemoteMetricObserver> = []

/// The timeout (in seconds) for requests to notify remote observers
private(set) public var remoteObserverNotificationTimeout: TimeInterval = 10.0

/**
Set the timeout for requests to update remote observers.
- Parameter timeout: The timeout in seconds.
*/
public func setRemoteNotificationTimeout(_ timeout: TimeInterval) {
remoteObserverNotificationTimeout = timeout
}

/**
Add a remote to receive notifications when an update to the metric occurs.

The remote observer must be an instance of a `MetricObserver` exposed through Vapor.
- SeeAlso: Check the documentation about `ClairvoyantVapor` on how to setup `Vapor` with `Clairvoyant`
- Note: Observers are distinguished by their url, and only one observer can be presented for each unique url.
- Returns: `true`, if the observer was added. `false`, if an observer for the same url already exists.
*/
@discardableResult
public func addRemoteObserver(_ remoteObserver: RemoteMetricObserver) -> Bool {
remoteObservers.insert(remoteObserver).inserted
}

/**
Try to send all pending values to remote observers.

- Parameter timeout: The time to wait for each request to complete. Uses ``remoteObserverNotificationTimeout`` if unset
- Note: This function is called automatically when the metric value changes.
There is usually no need to call this function manually.
*/
public func notifyRemoteObservers(timeout: TimeInterval? = nil) async {
guard !remoteObservers.isEmpty else {
return
}
await withTaskGroup(of: Void.self) { group in
for observer in remoteObservers {
group.addTask {
await self.push(to: observer, timeout: timeout)
}
}
}
}

@discardableResult
private func push(to remoteObserver: RemoteMetricObserver, timeout: TimeInterval?) async -> Bool {
let remoteUrl = remoteObserver.remoteUrl
do {
let route = ServerRoute.pushValueToMetric(idHash)
let url = remoteUrl.appendingPathComponent(route.rawValue)
var request = URLRequest(url: url)
request.timeoutInterval = timeout ?? remoteObserverNotificationTimeout
request.setValue(remoteObserver.authenticationToken, forHTTPHeaderField: "token")
let (_, response) = try await URLSession.shared.data(for: request)
guard let response = response as? HTTPURLResponse else {
await log("Invalid response pushing value to \(remoteUrl.path): \(response)")
return false
}
guard response.statusCode == 200 else {
await log("Failed to push value to \(remoteUrl.path): Response \(response.statusCode)")
return false
}
return true
} catch {
await log("Failed to push value to \(remoteUrl.path): \(error)")
return false
changeCallbacks.forEach { $0(lastValue) }
}
}

Expand All @@ -413,6 +329,16 @@ public actor Metric<T> where T: MetricValue {
_lastValue = nil
}
}

// MARK: Change callbacks

public func onChange(perform callback: @escaping MetricChangeCallback<T>) {
changeCallbacks.append(callback)
}

public func removeAllChangeListeners() {
changeCallbacks.removeAll()
}
}

extension Metric: AbstractMetric {
Expand Down
13 changes: 1 addition & 12 deletions Sources/Clairvoyant/Metric/MetricInfo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,6 @@ public struct MetricInfo {
/// The data type of the values in the metric
public let dataType: MetricType

/**
Indicates that this metric allows receiving updates from remotes.

If this property is `true`, then the metric can be updated externally through the `push` route of a Vapor observer.
- Note: This property is only relevant if the functionality of `ClairvoyantVapor` is used.
*/
public let canBeUpdatedByRemote: Bool

/**
Indicates that the metric writes values to disk locally.

Expand All @@ -39,13 +31,11 @@ public struct MetricInfo {
- Parameter dataType: The data type of the values in the metric
- Parameter name: A descriptive name of the metric
- Parameter description: A textual description of the metric
- Parameter canBeUpdatedByRemote: Indicate if the metric can be set through the Web API
- Parameter keepsLocalHistoryData: Indicate if the metric should persist the history to disk
*/
public init(id: String, dataType: MetricType, canBeUpdatedByRemote: Bool = false, keepsLocalHistoryData: Bool = true, name: String? = nil, description: String? = nil) {
public init(id: String, dataType: MetricType, keepsLocalHistoryData: Bool = true, name: String? = nil, description: String? = nil) {
self.id = id
self.dataType = dataType
self.canBeUpdatedByRemote = canBeUpdatedByRemote
self.keepsLocalHistoryData = keepsLocalHistoryData
self.name = name
self.description = description
Expand All @@ -59,7 +49,6 @@ extension MetricInfo: Codable {
case dataType = 2
case name = 3
case description = 4
case canBeUpdatedByRemote = 5
case keepsLocalHistoryData = 6
}

Expand Down
5 changes: 1 addition & 4 deletions Sources/Clairvoyant/Metric/MetricObserver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public final class MetricObserver {
logId: logMetricId,
name: logMetricName,
description: logMetricDescription,
canBeUpdatedByRemote: false,
keepsLocalHistoryData: true,
logFolder: logFolder,
encoder: encoder,
Expand All @@ -91,11 +90,10 @@ public final class MetricObserver {
- Parameter name: A descriptive name of the metric
- Parameter description: A textual description of the metric
- Parameter keepsLocalHistoryData: Indicate if the metric should persist the history to disk
- Parameter canBeUpdatedByRemote: Indicate if the metric can be set through the Web API
- Returns: The created or existing metric.
- Note: If a metric with the same `id` and `type` already exists, then this one is returned. Other properties (`name`, `description`, ...) are then ignored.
*/
public func addMetric<T>(id: String, containing type: T.Type = T.self, name: String? = nil, description: String? = nil, canBeUpdatedByRemote: Bool = false, keepsLocalHistoryData: Bool = true) -> Metric<T> where T: MetricValue {
public func addMetric<T>(id: String, containing type: T.Type = T.self, name: String? = nil, description: String? = nil, keepsLocalHistoryData: Bool = true) -> Metric<T> where T: MetricValue {
if let existing = observedMetrics[id.hashed()] {
guard let same = existing as? Metric<T> else {
fatalError("Two metrics with same id '\(id)' but different types where added to the same observer")
Expand All @@ -105,7 +103,6 @@ public final class MetricObserver {
let metric = Metric<T>(
id: id,
calledFromObserver: self,
canBeUpdatedByRemote: canBeUpdatedByRemote,
keepsLocalHistoryData: keepsLocalHistoryData,
name: name, description: description,
fileSize: maximumFileSizeInBytes)
Expand Down
Loading

0 comments on commit 09cd0b9

Please sign in to comment.