Skip to content

Commit

Permalink
Common tags (#549)
Browse files Browse the repository at this point in the history
  • Loading branch information
mikkokar authored Dec 4, 2019
1 parent 57ca491 commit 7235bb3
Show file tree
Hide file tree
Showing 12 changed files with 344 additions and 102 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2018 Expedia Inc.
Copyright (C) 2013-2019 Expedia Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
import static com.hotels.styx.api.HttpResponse.response;
import static com.hotels.styx.api.HttpResponseStatus.OK;
import static com.hotels.styx.api.extension.service.spi.StyxServiceStatus.CREATED;
import static com.hotels.styx.api.extension.service.spi.StyxServiceStatus.FAILED;
import static com.hotels.styx.api.extension.service.spi.StyxServiceStatus.RUNNING;
import static com.hotels.styx.api.extension.service.spi.StyxServiceStatus.STARTING;
import static com.hotels.styx.api.extension.service.spi.StyxServiceStatus.STOPPED;
Expand Down Expand Up @@ -91,7 +92,7 @@ public CompletableFuture<Void> stop() {

private Function<Throwable, Void> failWithMessage(String message) {
return cause -> {
status.set(StyxServiceStatus.FAILED);
status.set(FAILED);
throw new ServiceFailureException(message, cause);
};
}
Expand Down
121 changes: 121 additions & 0 deletions components/proxy/src/main/kotlin/com/hotels/styx/CommonTags.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
Copyright (C) 2013-2019 Expedia Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.hotels.styx

/**
* A common tag representation.
*
* Enforces a common tag format and provides a set of higher order functionality
* that operate on provided `name`, `encode` and `decode` properties.
*
* A tag string is a name-value pair separated by equal ('=') sign:
*
* tag-string = name "=" value-part
*
* `encode` and `decode` are functions to encode a T to a value-part string,
* and to decode the value-part string back to T.
*
* @property name a tag name
* @property encode a function that encodes a value of T as a tag value string
* @property decode a function that decodes a tag value string as a T
*/
sealed class CommonValueTag<T>(
val name: String,
val encode: (T) -> String?,
val decode: (String) -> T?) {

/**
* Extracts value part (the right hand side) from a tag string.
*
* @param tag a tag string
* @return the tag value
*/
fun valuePart(tag: String) = if (tag.startsWith("$name=")) {
tag.removePrefix("$name=")
} else {
null
}

/**
* Tests if a given string matches this tag. A match is positive when the string
* starts with `name` followed by `=`.
*
* @param a tag string
* @return True if this string is possibly a matching tag. Otherwise return false.
*/
fun match(tag: String) = tag.startsWith("$name=")

/**
* Decodes given tag string to its typed value.
*
* @param tag a tag string
* @return a decoded tag value, or null if decoding failed
*/
fun valueOf(tag: String) = valuePart(tag)
?.let { decode(it) }

/**
* Find this tag from a set of tag strings. If found, decode the tag value. Return null if
* tag was not found, or if decoding failed.
*
* @param tags a set of tag strings
* @return A decoded value if tag was found. Otherwise return null.
*/
fun find(tags: Set<String>) = tags.firstOrNull { this.match(it) }
?.let { valueOf(it) }

/**
* Removes all instances of this tag from a set of tag strings.
* Return a new Set<String> with all instances of this tag removed.
*
* @param tags a set of tag strings
* @return A new set of strings without this tag.
*/
fun remove(tags: Set<String>) = tags
.filterNot { this.match(it) }
.toSet()
}

/**
* A NullableValueTag invoke method returns null when value cannot be encoded to string.
* The API consumer must handle this situation.
*/
class NullableValueTag<T>(
name: String,
encode: (T) -> String?,
decode: (String) -> T?) : CommonValueTag<T>(name, encode, decode) {

operator fun invoke(value: T): String? = encode(value)
?.let {
"$name=$it"
}
}

/**
* A SafeValueTag invoke method throws a KotlinNullPointerException when the
* tag value cannot be encoded to string.
*/
class SafeValueTag<T>(
name: String,
encode: (T) -> String?,
decode: (String) -> T?) : CommonValueTag<T>(name, encode, decode) {

operator fun invoke(value: T): String = encode(value)
.let {
it!!
"$name=$it"
}
}
93 changes: 49 additions & 44 deletions components/proxy/src/main/kotlin/com/hotels/styx/ObjectTags.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,58 +15,63 @@
*/
package com.hotels.styx

private const val LBGROUP = "lbGroup"
private val LBGROUP_REGEX = "$LBGROUP=(.+)".toRegex()
fun lbGroupTag(name: String) = "lbGroup=$name"
fun lbGroupTag(tags: Set<String>) = tags.firstOrNull(::isLbGroupTag)
fun isLbGroupTag(tag: String) = LBGROUP_REGEX.matches(tag)
fun lbGroupTagValue(tags: Set<String>) = lbGroupTagValue(lbGroupTag(tags)?:"")
fun lbGroupTagValue(tag: String): String? = LBGROUP_REGEX.matchEntire(tag)
?.groupValues
?.get(1)
/*
* TAG: lbGroup
*/
val lbGroupTag = SafeValueTag(
"lbGroup",
{ it },
{ it })

/*
* TAG: source
*/
val sourceTag = SafeValueTag(
"source",
{ it },
{ it })

fun sourceTag(creator: String) = "source=$creator"
fun sourceTag(tags: Set<String>) = tags.firstOrNull { it.startsWith("source=") }
fun sourceTagValue(tags: Set<String>) = sourceTag(tags)?.substring("source".length + 1)

private const val STATE = "state"
/*
* TAG: state
*/
const val STATE_ACTIVE = "active"
const val STATE_UNREACHABLE = "unreachable"
const val STATE_INACTIVE = "inactive"
private val STATE_REGEX = "$STATE=(.+)".toRegex()
fun stateTag(value: String) = "$STATE=$value"
fun stateTag(tags: Set<String>) = tags.firstOrNull(::isStateTag)
fun isStateTag(tag: String) = STATE_REGEX.matches(tag)
fun stateTagValue(tags: Set<String>) = stateTagValue(stateTag(tags)?:"")
fun stateTagValue(tag: String) = STATE_REGEX.matchEntire(tag)
?.groupValues
?.get(1)

private const val HEALTHCHECK = "healthCheck"
val stateTag = SafeValueTag(
"state",
{ it },
{ it })

/*
* TAG: healthCheck
* healthCheck=on
* healthCheck=on;probes-OK:2
* healthCheck=on;probes-FAIL:1
*/
const val HEALTHCHECK_PASSING = "probes-OK"
const val HEALTHCHECK_FAILING = "probes-FAIL"
const val HEALTHCHECK_ON = "on"
private val HEALTHCHECK_REGEX = "$HEALTHCHECK_ON(?:;(.+):([0-9]+))?".toRegex()


// healthCheck=on
// healthCheck=on;probes-OK:2
// healthCheck=on;probes-FAIL:1
private val HEALTHCHECK_REGEX = "$HEALTHCHECK=$HEALTHCHECK_ON(?:;(.+):([0-9]+))?".toRegex()
fun healthCheckTag(value: Pair<String, Int>?) =
if (value != null && value.first.isNotBlank() && value.second > 0) {
"$HEALTHCHECK=$HEALTHCHECK_ON;${value.first}:${value.second}"
} else if (value != null && value.first.isNotBlank() && value.second == 0) {
"$HEALTHCHECK=$HEALTHCHECK_ON"
} else {
null
}
fun healthCheckTag(tags: Set<String>) = tags.firstOrNull(::isHealthCheckTag)
fun isHealthCheckTag(tag: String) = HEALTHCHECK_REGEX.matches(tag)
fun healthCheckTagValue(tags: Set<String>) = healthCheckTagValue(healthCheckTag(tags)?:"")
fun healthCheckTagValue(tag: String) = HEALTHCHECK_REGEX.matchEntire(tag)
?.groupValues
?.let {
if (it[1].isNotEmpty()) {
Pair(it[1], it[2].toInt())
val healthCheckTag = NullableValueTag(
"healthCheck",
{ value -> if (value.first.isNotBlank() && value.second > 0) {
"$HEALTHCHECK_ON;${value.first}:${value.second}"
} else if (value.first.isNotBlank() && value.second == 0) {
HEALTHCHECK_ON
} else {
Pair(HEALTHCHECK_ON, 0)
}}
null
}
},
{ tagValue -> HEALTHCHECK_REGEX.matchEntire(tagValue)
?.groupValues
?.let {
if (it[1].isNotEmpty()) {
Pair(it[1], it[2].toInt())
} else {
Pair(HEALTHCHECK_ON, 0)
}}
})
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.hotels.styx.routing.handlers

import com.fasterxml.jackson.annotation.JsonProperty
import com.hotels.styx.*
import com.hotels.styx.api.Eventual
import com.hotels.styx.api.HttpInterceptor
import com.hotels.styx.api.Id
Expand All @@ -40,10 +39,12 @@ import com.hotels.styx.config.schema.SchemaDsl.integer
import com.hotels.styx.config.schema.SchemaDsl.optional
import com.hotels.styx.config.schema.SchemaDsl.string
import com.hotels.styx.infrastructure.configuration.yaml.JsonNodeConfig
import com.hotels.styx.lbGroupTag
import com.hotels.styx.routing.RoutingObject
import com.hotels.styx.routing.RoutingObjectRecord
import com.hotels.styx.routing.config.RoutingObjectFactory
import com.hotels.styx.routing.config.StyxObjectDefinition
import com.hotels.styx.stateTag
import org.slf4j.LoggerFactory
import reactor.core.Disposable
import reactor.core.publisher.toFlux
Expand Down Expand Up @@ -121,17 +122,16 @@ internal class LoadBalancingGroup(val client: StyxBackendServiceClient, val chan

private fun routeDatabaseChanged(appId: String, snapshot: ObjectStore<RoutingObjectRecord>, remoteHosts: AtomicReference<Set<RemoteHost>>) {
val newSet = snapshot.entrySet()
.filter { taggedWith(it, ::lbGroupTagValue, appId) }
.filter { taggedWith(it, ::stateTagValue, STATE_ACTIVE, null) }
.filter { it.value.tags.contains(lbGroupTag(appId)) }
.filter { stateTag.find(it.value.tags)
.let { it == null || it == "active" }
}
.map { toRemoteHost(appId, it) }
.toSet()

remoteHosts.set(newSet)
}

private fun taggedWith(recordEntry: Map.Entry<String, RoutingObjectRecord>, tagValue: (Set<String>) -> String?, vararg values: String?) =
values.contains(tagValue(recordEntry.value.tags))

private fun toRemoteHost(appId: String, record: Map.Entry<String, RoutingObjectRecord>): RemoteHost {
val routingObject = record.value.routingObject
val originName = record.key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ internal class HealthCheckMonitoringService(

internal val EXECUTOR = ScheduledThreadPoolExecutor(2)

private val LOGGER = LoggerFactory.getLogger(HealthCheckMonitoringService::class.java)
internal val LOGGER = LoggerFactory.getLogger(HealthCheckMonitoringService::class.java)
}

private val probe = urlProbe(HttpRequest.get(urlPath).build(), Duration.ofMillis(1000))
private val determineObjectState = healthCheckFunction(activeThreshold, inactiveThreshold)
private val futureRef: AtomicReference<ScheduledFuture<*>> = AtomicReference()

override fun startService() = CompletableFuture.runAsync {
LOGGER.info("started - {} - {}", period.toMillis(), period.toMillis())
LOGGER.info("started service for {} - {} - {}", arrayOf(application, period.toMillis(), period.toMillis()))
futureRef.set(executor.scheduleAtFixedRate(
{ runChecks(application, objectStore) },
period.toMillis(),
Expand All @@ -85,12 +85,32 @@ internal class HealthCheckMonitoringService(
}

override fun stopService() = CompletableFuture.runAsync {
LOGGER.info("stopped")
LOGGER.info("stopped service for {}", application)

objectStore.entrySet()
.filter(::containsRelevantStateTag)
.forEach { (name, _) ->
markObject(objectStore, name, ObjectActive(0, false))
.forEach { (name, record) ->
objectStore.get(name).ifPresent {
try {
objectStore.compute(name) { previous ->
if (previous == null) throw ObjectDisappearedException()

val newTags = previous.tags
.let { healthCheckTag.remove(it) }
.let { stateTag.remove(it) }
.plus(stateTag(STATE_ACTIVE))

if (previous.tags != newTags)
it.copy(tags = newTags)
else
previous
}
} catch (e: ObjectDisappearedException) {
// Object disappeared between the ifPresent check and the compute, but we don't really mind.
// We just want to exit the compute, to avoid re-creating it.
// (The ifPresent is not strictly required, but a pre-emptive check is preferred to an exception)
}
}
}

futureRef.get().cancel(false)
Expand All @@ -102,7 +122,7 @@ internal class HealthCheckMonitoringService(
.filter { (_, record) -> record.tags.contains(lbGroupTag(application)) }
.map { (name, record) ->
val tags = record.tags
val objectHealth = objectHealthFrom(stateTagValue(tags), healthCheckTagValue(tags))
val objectHealth = objectHealthFrom(stateTag.find(tags), healthCheckTag.find(tags))
Triple(name, record, objectHealth)
}

Expand Down Expand Up @@ -177,6 +197,7 @@ internal fun objectHealthFrom(state: String?, health: Pair<String, Int>?) =

internal class ObjectDisappearedException : RuntimeException("Object disappeared")


private fun markObject(db: StyxObjectStore<RoutingObjectRecord>, name: String, newStatus: ObjectHealth) {
// The ifPresent is not ideal, but compute() does not allow the computation to return null. So we can't preserve
// a state where the object does not exist using compute alone. But even with ifPresent, as we are open to
Expand All @@ -202,14 +223,14 @@ private fun markObject(db: StyxObjectStore<RoutingObjectRecord>, name: String, n
}

internal fun reTag(tags: Set<String>, newStatus: ObjectHealth) =
tags.asSequence()
.filterNot { isStateTag(it) || isHealthCheckTag(it) }
.plus(stateTag(newStatus.state()))
.plus(healthCheckTag(newStatus.health()))
.filterNotNull()
.toSet()
tags.asSequence()
.filterNot { stateTag.match(it) || healthCheckTag.match(it) }
.plus(stateTag(newStatus.state()))
.plus(healthCheckTag(newStatus.health()!!))
.filterNotNull()
.toSet()

private val RELEVANT_STATES = setOf(STATE_ACTIVE, STATE_UNREACHABLE)
private fun containsRelevantStateTag(entry: Map.Entry<String, RoutingObjectRecord>) =
stateTagValue(entry.value.tags) in RELEVANT_STATES
stateTag.find(entry.value.tags) in RELEVANT_STATES

Loading

0 comments on commit 7235bb3

Please sign in to comment.