Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement pull query routing to standbys if active is down #4398

Merged
merged 11 commits into from
Feb 6, 2020
Merged
30 changes: 20 additions & 10 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,21 @@ public class KsqlConfig extends AbstractConfig {
"Config to enable or disable transient pull queries on a specific KSQL server.";
public static final boolean KSQL_QUERY_PULL_ENABLE_DEFAULT = true;

public static final String KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_CONFIG =
"ksql.query.pull.routing.timeout.ms";
public static final Long KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DEFAULT = 30000L;
public static final String KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DOC = "Timeout in milliseconds "
+ "when waiting for the lookup of the owner of a row key";
public static final String KSQL_QUERY_PULL_ENABLE_STANDBY_READS =
"ksql.query.pull.enable.stale.reads";
private static final String KSQL_QUERY_PULL_ENABLE_STANDBY_READS_DOC =
vpapavas marked this conversation as resolved.
Show resolved Hide resolved
"Config to enable/disable forwarding pull queries to standby hosts when the active is dead. "
+ "This means that stale values may be returned for these queries since standby hosts"
+ "receive updates from the changelog topic (to which the active writes to) "
+ "asynchronously. Turning on this configuration, effectively sacrifices "
+ "consistency for higher availability. "
+ "Possible values are \"true\", \"false\". Setting to \"true\" guarantees high "
+ "availability for pull queries. If set to \"false\", pull queries will fail when"
+ "the active is dead and until a new active is elected. Default value is \"false\". "
+ "For using this functionality, the server must be configured with "
+ "to ksql.streams.num.standby.replicas >= 1";
public static final boolean KSQL_QUERY_PULL_ENABLE_STANDBY_READS_DEFAULT = false;


public static final String KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG =
"ksql.query.pull.streamsstore.rebalancing.timeout.ms";
Expand Down Expand Up @@ -517,11 +527,11 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PULL_ENABLE_DOC
).define(
KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DOC
KSQL_QUERY_PULL_ENABLE_STANDBY_READS,
Type.BOOLEAN,
KSQL_QUERY_PULL_ENABLE_STANDBY_READS_DEFAULT,
Importance.MEDIUM,
KSQL_QUERY_PULL_ENABLE_STANDBY_READS_DOC
).define(
KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
import com.google.errorprone.annotations.Immutable;
import java.util.Objects;


/**
* Immutable representation of {@link org.apache.kafka.streams.state.HostInfo HostInfo}
* from KStreams.
*/
@Immutable
public class KsqlHost {
public class KsqlHostInfo {

private final String host;
private final int port;

public KsqlHost(final String host, final int port) {
public KsqlHostInfo(final String host, final int port) {
this.host = host;
this.port = port;
}
Expand All @@ -42,8 +43,8 @@ public boolean equals(final Object o) {
return false;
}

final KsqlHost hostInfo = (KsqlHost) o;
return port == hostInfo.port && host.equals(hostInfo.host);
final KsqlHostInfo other = (KsqlHostInfo) o;
return this.host.equals(other.host) && port == other.port;
}

@Override
Expand All @@ -61,6 +62,6 @@ public int port() {

@Override
public String toString() {
return "KsqlHost{host='" + this.host + '\'' + ", port=" + this.port + '}';
return "KsqlHostInfo{host='" + this.host + '\'' + ", port=" + this.port + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.KsqlHost;
import io.confluent.ksql.util.KsqlHostInfo;
import java.net.URI;
import java.util.List;

Expand Down Expand Up @@ -52,7 +52,7 @@ public RestResponse<List<StreamedRow>> makeQueryRequest(
@Override
public void makeAsyncHeartbeatRequest(
final URI serverEndPoint,
final KsqlHost host,
final KsqlHostInfo host,
final long timestamp
) {
throw new UnsupportedOperationException("KSQL client is disabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.KsqlHost;
import io.confluent.ksql.util.KsqlHostInfo;
import java.net.URI;
import java.util.List;
import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -46,7 +46,7 @@ RestResponse<List<StreamedRow>> makeQueryRequest(
*/
void makeAsyncHeartbeatRequest(
URI serverEndPoint,
KsqlHost host,
KsqlHostInfo host,
long timestamp
);

Expand All @@ -58,7 +58,8 @@ void makeAsyncHeartbeatRequest(
RestResponse<ClusterStatusResponse> makeClusterStatusRequest(URI serverEndPoint);

/**
* Send lag information to remote Ksql server.
* Send a request to remote Ksql server to inquire to inquire about which state stores the
vpapavas marked this conversation as resolved.
Show resolved Hide resolved
* remote server maintains as an active and standby.
* @param serverEndPoint the remote destination.
* @param lagReportingMessage the host lag data
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import io.confluent.ksql.execution.streams.RoutingFilter;
import io.confluent.ksql.util.KsqlHostInfo;
import org.apache.kafka.streams.state.HostInfo;

/**
* Filters for the active host.
*/
public class ActiveHostFilter implements RoutingFilter {

public ActiveHostFilter() {
}

/**
* Returns true if the host is the active host for a particular state store.
* @param activeHost the active host for a particular state store
* @param host The host for which the status is checked
* @param storeName Ignored
* @param partition Ignored
* @return true if the host is the active, false otherwise
*/
@Override
public boolean filter(
final HostInfo activeHost,
final KsqlHostInfo host,
final String storeName,
final int partition) {

return host.host().equals(activeHost.host()) && host.port() == activeHost.port();
vpapavas marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.HostStatus;
import io.confluent.ksql.util.KsqlHost;
import io.confluent.ksql.util.KsqlHostInfo;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.net.URI;
import java.net.URL;
Expand Down Expand Up @@ -80,12 +80,12 @@ public final class HeartbeatAgent {
private final ServiceContext serviceContext;
private final HeartbeatConfig config;
private final List<HostStatusListener> hostStatusListeners;
private final ConcurrentHashMap<KsqlHost, TreeMap<Long, HeartbeatInfo>> receivedHeartbeats;
private final ConcurrentHashMap<KsqlHost, HostStatus> hostsStatus;
private final ConcurrentHashMap<KsqlHostInfo, TreeMap<Long, HeartbeatInfo>> receivedHeartbeats;
private final ConcurrentHashMap<KsqlHostInfo, HostStatus> hostsStatus;
private final ScheduledExecutorService scheduledExecutorService;
private final ServiceManager serviceManager;
private final Clock clock;
private KsqlHost localHost;
private KsqlHostInfo localHost;
private URL localUrl;

public static HeartbeatAgent.Builder builder() {
Expand Down Expand Up @@ -114,7 +114,7 @@ private HeartbeatAgent(final KsqlEngine engine,
* @param hostInfo The host information of the remote Ksql server.
* @param timestamp The timestamp the heartbeat was sent.
*/
public void receiveHeartbeat(final KsqlHost hostInfo, final long timestamp) {
public void receiveHeartbeat(final KsqlHostInfo hostInfo, final long timestamp) {
final TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.computeIfAbsent(
hostInfo, key -> new TreeMap<>());
synchronized (heartbeats) {
Expand All @@ -127,12 +127,12 @@ public void receiveHeartbeat(final KsqlHost hostInfo, final long timestamp) {
* Returns the current view of the cluster containing all hosts discovered (whether alive or dead)
* @return status of discovered hosts
*/
public Map<KsqlHost, HostStatus> getHostsStatus() {
public Map<KsqlHostInfo, HostStatus> getHostsStatus() {
return Collections.unmodifiableMap(hostsStatus);
}

@VisibleForTesting
void setHostsStatus(final Map<KsqlHost, HostStatus> status) {
void setHostsStatus(final Map<KsqlHostInfo, HostStatus> status) {
hostsStatus.putAll(status);
}

Expand All @@ -156,7 +156,7 @@ void stopAgent() {

void setLocalAddress(final String applicationServer) {
final HostInfo hostInfo = ServerUtil.parseHostInfo(applicationServer);
this.localHost = new KsqlHost(hostInfo.host(), hostInfo.port());
this.localHost = new KsqlHostInfo(hostInfo.host(), hostInfo.port());
try {
this.localUrl = new URL(applicationServer);
} catch (final Exception e) {
Expand Down Expand Up @@ -226,30 +226,30 @@ private void processHeartbeats(final long windowStart, final long windowEnd) {
return;
}

for (Entry<KsqlHost, HostStatus> hostEntry: hostsStatus.entrySet()) {
final KsqlHost ksqlHost = hostEntry.getKey();
for (Entry<KsqlHostInfo, HostStatus> hostEntry: hostsStatus.entrySet()) {
final KsqlHostInfo ksqlHostInfo = hostEntry.getKey();
final HostStatus hostStatus = hostEntry.getValue();
if (ksqlHost.equals(localHost)) {
if (ksqlHostInfo.equals(localHost)) {
continue;
}
final TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.get(ksqlHost);
final TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.get(ksqlHostInfo);
//For previously discovered hosts, if they have not received any heartbeats, mark them dead
if (heartbeats == null || heartbeats.isEmpty()) {
hostsStatus.computeIfPresent(ksqlHost, (host, status) -> status.withHostAlive(false));
hostsStatus.computeIfPresent(ksqlHostInfo, (host, status) -> status.withHostAlive(false));
} else {
final TreeMap<Long, HeartbeatInfo> copy;
synchronized (heartbeats) {
LOG.debug("Process heartbeats: {} of host: {}", heartbeats, ksqlHost);
LOG.debug("Process heartbeats: {} of host: {}", heartbeats, ksqlHostInfo);
// 1. remove heartbeats older than window
heartbeats.headMap(windowStart).clear();
copy = new TreeMap<>(heartbeats.subMap(windowStart, true, windowEnd, true));
}
// 2. count consecutive missed heartbeats and mark as alive or dead
final boolean isAlive = decideStatus(ksqlHost, windowStart, windowEnd, copy);
final boolean isAlive = decideStatus(ksqlHostInfo, windowStart, windowEnd, copy);
if (!isAlive) {
LOG.info("Host: {} marked as dead.", ksqlHost);
LOG.info("Host: {} marked as dead.", ksqlHostInfo);
}
hostsStatus.computeIfPresent(ksqlHost, (host, status) -> status
hostsStatus.computeIfPresent(ksqlHostInfo, (host, status) -> status
.withHostAlive(isAlive).withLastStatusUpdateMs(windowEnd));
}
}
Expand All @@ -259,7 +259,7 @@ private void processHeartbeats(final long windowStart, final long windowEnd) {
}

private boolean decideStatus(
final KsqlHost ksqlHost, final long windowStart, final long windowEnd,
final KsqlHostInfo ksqlHostInfo, final long windowStart, final long windowEnd,
final TreeMap<Long, HeartbeatInfo> heartbeats
) {
long missedCount = 0;
Expand Down Expand Up @@ -289,7 +289,7 @@ private boolean decideStatus(
if (windowEnd - prev - 1 > 0) {
missedCount = (windowEnd - prev - 1) / config.heartbeatSendIntervalMs;
}
LOG.debug("Host: {} has {} missing heartbeats", ksqlHost, missedCount);
LOG.debug("Host: {} has {} missing heartbeats", ksqlHostInfo, missedCount);
return (missedCount < config.heartbeatMissedThreshold);
}
}
Expand All @@ -306,8 +306,8 @@ class SendHeartbeatService extends AbstractScheduledService {

@Override
protected void runOneIteration() {
for (Entry<KsqlHost, HostStatus> hostStatusEntry: hostsStatus.entrySet()) {
final KsqlHost remoteHost = hostStatusEntry.getKey();
for (Entry<KsqlHostInfo, HostStatus> hostStatusEntry: hostsStatus.entrySet()) {
final KsqlHostInfo remoteHost = hostStatusEntry.getKey();
try {
if (!remoteHost.equals(localHost)) {
final URI remoteUri = ServerUtil.buildRemoteUri(
Expand Down Expand Up @@ -364,7 +364,7 @@ protected void runOneIteration() {
// Only add to map if it is the first time it is discovered. Design decision to
// optimistically consider every newly discovered server as alive to avoid situations of
// unavailability until the heartbeating kicks in.
final KsqlHost host = new KsqlHost(hostInfo.host(), hostInfo.port());
final KsqlHostInfo host = new KsqlHostInfo(hostInfo.host(), hostInfo.port());
hostsStatus.computeIfAbsent(host, key -> new HostStatus(true, clock.millis()));
}
} catch (Throwable t) {
Expand Down Expand Up @@ -492,6 +492,6 @@ public interface HostStatusListener {
* Call when the map of host statuses are updated
* @param hostsStatusMap The new host status map
*/
void onHostStatusUpdated(Map<KsqlHost, HostStatus> hostsStatusMap);
void onHostStatusUpdated(Map<KsqlHostInfo, HostStatus> hostsStatusMap);
}
}
Loading