-
Notifications
You must be signed in to change notification settings - Fork 56
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add NumberSignal basic support (#2502)
* feat: add full stack NumberSignal Fixes #2429 --------- Co-authored-by: Vlad Rindevich <[email protected]>
- Loading branch information
Showing
28 changed files
with
2,539 additions
and
2,381 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
163 changes: 163 additions & 0 deletions
163
packages/java/endpoint/src/main/java/com/vaadin/hilla/signals/NumberSignal.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
package com.vaadin.hilla.signals; | ||
|
||
import com.fasterxml.jackson.databind.node.ObjectNode; | ||
|
||
import com.vaadin.hilla.signals.core.StateEvent; | ||
import jakarta.annotation.Nullable; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Sinks; | ||
|
||
import java.util.HashSet; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
|
||
/** | ||
* A signal that holds a number value. | ||
*/ | ||
public class NumberSignal { | ||
|
||
private static final Logger LOGGER = LoggerFactory | ||
.getLogger(NumberSignal.class); | ||
|
||
private final ReentrantLock lock = new ReentrantLock(); | ||
|
||
private final UUID id = UUID.randomUUID(); | ||
|
||
private Double value; | ||
|
||
private final Set<Sinks.Many<ObjectNode>> subscribers = new HashSet<>(); | ||
|
||
/** | ||
* Creates a new NumberSignal with the provided default value. | ||
* | ||
* @param defaultValue | ||
* the default value | ||
*/ | ||
public NumberSignal(@Nullable Double defaultValue) { | ||
this.value = defaultValue; | ||
} | ||
|
||
/** | ||
* Creates a new NumberSignal with the default value of 0. | ||
*/ | ||
public NumberSignal() { | ||
this(0.0); | ||
} | ||
|
||
/** | ||
* Subscribes to the signal. | ||
* | ||
* @return a Flux of JSON events | ||
*/ | ||
public Flux<ObjectNode> subscribe() { | ||
Sinks.Many<ObjectNode> sink = Sinks.many().unicast() | ||
.onBackpressureBuffer(); | ||
|
||
return sink.asFlux().doOnSubscribe(ignore -> { | ||
LOGGER.debug("New Flux subscription..."); | ||
lock.lock(); | ||
try { | ||
var currentValue = createSnapshot(); | ||
sink.tryEmitNext(currentValue); | ||
subscribers.add(sink); | ||
} finally { | ||
lock.unlock(); | ||
} | ||
}).doFinally(ignore -> { | ||
lock.lock(); | ||
try { | ||
LOGGER.debug("Unsubscribing from NumberSignal..."); | ||
subscribers.remove(sink); | ||
} finally { | ||
lock.unlock(); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* Submits an event to the signal and notifies subscribers about the change | ||
* of the signal value. | ||
* | ||
* @param event | ||
* the event to submit | ||
*/ | ||
public void submit(ObjectNode event) { | ||
lock.lock(); | ||
try { | ||
processEvent(event); | ||
// Notify subscribers | ||
subscribers.removeIf(sink -> { | ||
var updatedValue = createSnapshot(); | ||
boolean failure = sink.tryEmitNext(updatedValue).isFailure(); | ||
if (failure) { | ||
LOGGER.debug("Failed push"); | ||
} | ||
return failure; | ||
}); | ||
} finally { | ||
lock.unlock(); | ||
} | ||
} | ||
|
||
/** | ||
* Returns the signal UUID. | ||
* | ||
* @return the id | ||
*/ | ||
public UUID getId() { | ||
return this.id; | ||
} | ||
|
||
/** | ||
* Returns the signal's current value. | ||
* | ||
* @return the value | ||
*/ | ||
@Nullable | ||
public Double getValue() { | ||
return this.value; | ||
} | ||
|
||
private ObjectNode createSnapshot() { | ||
var snapshot = new StateEvent<>(this.id, StateEvent.EventType.SNAPSHOT, | ||
this.value); | ||
return snapshot.toJson(); | ||
} | ||
|
||
private void processEvent(ObjectNode event) { | ||
try { | ||
var stateEvent = new StateEvent<Double>(event); | ||
if (isSetEvent(stateEvent)) { | ||
this.value = stateEvent.getValue(); | ||
} else { | ||
throw new UnsupportedOperationException( | ||
"Unsupported event: " + event); | ||
} | ||
} catch (StateEvent.InvalidEventTypeException e) { | ||
throw new UnsupportedOperationException( | ||
"Unsupported JSON: " + event, e); | ||
} | ||
} | ||
|
||
private boolean isSetEvent(StateEvent<?> event) { | ||
return StateEvent.EventType.SET.equals(event.getEventType()); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) | ||
return true; | ||
if (!(o instanceof NumberSignal signal)) | ||
return false; | ||
return Objects.equals(getId(), signal.getId()); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hashCode(getId()); | ||
} | ||
} |
54 changes: 54 additions & 0 deletions
54
...ges/java/endpoint/src/main/java/com/vaadin/hilla/signals/config/SignalsConfiguration.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package com.vaadin.hilla.signals.config; | ||
|
||
import com.vaadin.hilla.ConditionalOnFeatureFlag; | ||
import com.vaadin.hilla.EndpointInvoker; | ||
import com.vaadin.hilla.signals.handler.SignalsHandler; | ||
import com.vaadin.hilla.signals.core.SignalsRegistry; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
|
||
/** | ||
* Spring beans configuration for signals. | ||
*/ | ||
@Configuration | ||
public class SignalsConfiguration { | ||
|
||
private SignalsRegistry signalsRegistry; | ||
private SignalsHandler signalsHandler; | ||
private final EndpointInvoker endpointInvoker; | ||
|
||
public SignalsConfiguration(EndpointInvoker endpointInvoker) { | ||
this.endpointInvoker = endpointInvoker; | ||
} | ||
|
||
/** | ||
* Initializes the SignalsRegistry bean when the fullstackSignals feature | ||
* flag is enabled. | ||
* | ||
* @return SignalsRegistry bean instance | ||
*/ | ||
@ConditionalOnFeatureFlag("fullstackSignals") | ||
@Bean | ||
public SignalsRegistry signalsRegistry() { | ||
if (signalsRegistry == null) { | ||
signalsRegistry = new SignalsRegistry(); | ||
} | ||
return signalsRegistry; | ||
} | ||
|
||
/** | ||
* Initializes the SignalsHandler endpoint when the fullstackSignals feature | ||
* flag is enabled. | ||
* | ||
* @return SignalsHandler endpoint instance | ||
*/ | ||
@ConditionalOnFeatureFlag("fullstackSignals") | ||
@Bean | ||
public SignalsHandler signalsHandler() { | ||
if (signalsHandler == null) { | ||
signalsHandler = new SignalsHandler(signalsRegistry(), | ||
endpointInvoker); | ||
} | ||
return signalsHandler; | ||
} | ||
} |
Oops, something went wrong.