Skip to content

Commit

Permalink
feat(connector): display metrics to grafana (#8684)
Browse files Browse the repository at this point in the history
  • Loading branch information
WillyKidd authored Mar 27, 2023
1 parent 96ff27f commit b3f4fd9
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 35 deletions.
21 changes: 21 additions & 0 deletions grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2609,6 +2609,26 @@ def section_memory_manager(outer_panels):
),
]

def section_connector_node(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Connector Node",
[
panels.timeseries_rowsps(
"Connector Source Throughput(rows)",
"",
[
panels.target(
f"rate({metric('connector_source_rows_received')}[$__interval])",
"{{source_type}} @ {{source_id}}",
),
],
),
],
)
]

templating = Templating()
if namespace_filter_enabled:
templating = Templating(
Expand Down Expand Up @@ -2666,5 +2686,6 @@ def section_memory_manager(outer_panels):
*section_grpc_hummock_meta_client(panels),
*section_frontend(panels),
*section_memory_manager(panels),
*section_connector_node(panels),
],
).auto_panel_ids()
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,6 @@ private void bindSink(SinkConfig sinkConfig, ConnectorServiceProto.SinkPayloadFo
deserializer = new StreamChunkDeserializer(tableSchema);
break;
}
ConnectorNodeMetrics.incActiveConnections(sinkConfig.getConnectorType(), "node1");
ConnectorNodeMetrics.incActiveSinkConnections(sinkConfig.getConnectorType(), "node1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,55 @@

import static io.grpc.Status.INTERNAL;

import com.sun.management.OperatingSystemMXBean;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.exporter.HTTPServer;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.net.InetSocketAddress;

public class ConnectorNodeMetrics {
private static final Counter activeConnections =
private static final Counter activeSourceConnections =
Counter.build()
.name("active_connections")
.name("active_source_connections")
.labelNames("source_type", "ip")
.help("Number of active source connections")
.register();

private static final Counter activeSinkConnections =
Counter.build()
.name("active_sink_connections")
.labelNames("sink_type", "ip")
.help("Number of active connections")
.help("Number of active sink connections")
.register();

private static final Counter totalConnections =
private static final Counter totalSinkConnections =
Counter.build()
.name("total_connections")
.name("total_sink_connections")
.labelNames("sink_type", "ip")
.help("Number of total connections")
.register();
private static final Gauge cpuUsage =
Gauge.build()
.name("cpu_usage")
.labelNames("node_id")
.help("CPU usage in percentage")
private static final Counter cpuUsage =
Counter.build()
.name("process_cpu_seconds_total")
.labelNames("job")
.help("Total user and system CPU time spent in seconds.")
.register();
private static final Gauge ramUsage =
Gauge.build()
.name("ram_usage")
.labelNames("node_id")
.name("process_resident_memory_bytes")
.labelNames("job")
.help("RAM usage in bytes")
.register();

private static final Counter sourceRowsReceived =
Counter.build()
.name("connector_source_rows_received")
.labelNames("source_type", "source_id")
.help("Number of rows received by source")
.register();
private static final Counter sinkRowsReceived =
Counter.build()
.name("sink_rows_received")
Expand All @@ -68,14 +81,15 @@ public class ConnectorNodeMetrics {
static class PeriodicMetricsCollector extends Thread {
private final int interval;
private final OperatingSystemMXBean osBean;
private final String nodeId;
private final String job;

public PeriodicMetricsCollector(int intervalMillis, String nodeId) {
public PeriodicMetricsCollector(int intervalMillis, String job) {
this.interval = intervalMillis;
this.nodeId = nodeId;
this.osBean = ManagementFactory.getOperatingSystemMXBean();
this.job = job;
this.osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
}

@SuppressWarnings({"InfiniteLoopStatement", "BusyWait"})
@Override
public void run() {
while (true) {
Expand All @@ -89,55 +103,65 @@ public void run() {
}

private void collect() {
double cpuUsage = osBean.getSystemLoadAverage();
ConnectorNodeMetrics.cpuUsage.labels(nodeId).set(cpuUsage);
double cpuTotal = osBean.getProcessCpuTime() / 1000000000.0;
double cpuPast = ConnectorNodeMetrics.cpuUsage.labels(job).get();
ConnectorNodeMetrics.cpuUsage.labels(job).inc(cpuTotal - cpuPast);
long ramUsageBytes =
Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
ConnectorNodeMetrics.ramUsage.labels(nodeId).set(ramUsageBytes);
ConnectorNodeMetrics.ramUsage.labels(job).set(ramUsageBytes);
}
}

public static void startHTTPServer(int port) {
CollectorRegistry registry = new CollectorRegistry();
registry.register(activeConnections);
registry.register(activeSourceConnections);
registry.register(activeSinkConnections);
registry.register(sourceRowsReceived);
registry.register(cpuUsage);
registry.register(ramUsage);
PeriodicMetricsCollector collector = new PeriodicMetricsCollector(1000, "node1");
PeriodicMetricsCollector collector = new PeriodicMetricsCollector(1000, "connector");
collector.start();

try {
HTTPServer server = new HTTPServer(new InetSocketAddress("localhost", port), registry);
new HTTPServer(new InetSocketAddress("localhost", port), registry);
} catch (IOException e) {
throw INTERNAL.withDescription("Failed to start HTTP server")
.withCause(e)
.asRuntimeException();
}
}

public static void incActiveConnections(String sinkType, String ip) {
activeConnections.labels(sinkType, ip).inc();
public static void incActiveSourceConnections(String sourceType, String ip) {
activeSourceConnections.labels(sourceType, ip).inc();
}

public static void decActiveSourceConnections(String sourceType, String ip) {
activeSourceConnections.remove(sourceType, ip);
}

public static void incActiveSinkConnections(String sinkType, String ip) {
activeSinkConnections.labels(sinkType, ip).inc();
}

public static void decActiveConnections(String sinkType, String ip) {
activeConnections.remove(sinkType, ip);
activeSinkConnections.remove(sinkType, ip);
}

public static void incSourceRowsReceived(String sourceType, String sourceId, double amt) {
sourceRowsReceived.labels(sourceType, sourceId).inc(amt);
}

public static void incSinkRowsReceived() {
sinkRowsReceived.inc();
}

public static void incTotalConnections(String sinkType, String ip) {
totalConnections.labels(sinkType, ip).inc();
totalSinkConnections.labels(sinkType, ip).inc();
}

public static void incErrorCount(String sinkType, String ip) {
errorCount.labels(sinkType, ip).inc();
}

public static void setCpuUsage(String ip, double cpuUsagePercentage) {
cpuUsage.labels(ip).set(cpuUsagePercentage);
}

public static void setRamUsage(String ip, long usedRamInBytes) {
ramUsage.labels(ip).set(usedRamInBytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.risingwave.sourcenode;

import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.metrics.ConnectorNodeMetrics;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.Data.DataType;
import com.risingwave.sourcenode.common.DbzConnectorConfig;
Expand Down Expand Up @@ -53,9 +54,15 @@ public void handle(ConnectorServiceProto.GetEventStreamRequest request) {
startRequest.getSourceId(),
startRequest.getStartOffset(),
startRequest.getPropertiesMap());
ConnectorNodeMetrics.incActiveSourceConnections(
startRequest.getSourceType().toString(),
startRequest.getPropertiesMap().get(DbzConnectorConfig.HOST));
handler.startSource(
(ServerCallStreamObserver<ConnectorServiceProto.GetEventStreamResponse>)
responseObserver);
ConnectorNodeMetrics.decActiveSourceConnections(
startRequest.getSourceType().toString(),
startRequest.getPropertiesMap().get(DbzConnectorConfig.HOST));
} catch (Throwable t) {
LOG.error("failed to start source", t);
responseObserver.onError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,19 @@ public static Map<String, String> extractDebeziumProperties(Map<String, String>
}

private final long sourceId;

private final SourceTypeE sourceType;

private final Properties resolvedDbzProps;

public long getSourceId() {
return sourceId;
}

public SourceTypeE getSourceType() {
return sourceType;
}

public Properties getResolvedDebeziumProps() {
return resolvedDbzProps;
}
Expand Down Expand Up @@ -115,6 +122,7 @@ public DbzConnectorConfig(
dbzProps.putAll(otherProps);

this.sourceId = sourceId;
this.sourceType = source;
this.resolvedDbzProps = dbzProps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.risingwave.sourcenode.core;

import com.risingwave.connector.api.source.SourceHandler;
import com.risingwave.metrics.ConnectorNodeMetrics;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
import com.risingwave.sourcenode.common.DbzConnectorConfig;
import io.grpc.Context;
Expand Down Expand Up @@ -57,7 +58,10 @@ public void startSource(ServerCallStreamObserver<GetEventStreamResponse> respons
// wait a bit to avoid OOM
Thread.sleep(500);
}

ConnectorNodeMetrics.incSourceRowsReceived(
config.getSourceType().toString(),
String.valueOf(config.getSourceId()),
resp.getEventsCount());
LOG.debug(
"Engine#{}: emit one chunk {} events to network ",
config.getSourceId(),
Expand Down
6 changes: 6 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,9 @@ template:
# Frontend used by this Prometheus instance
provide-frontend: "frontend*"

# Connector-node used by this Prometheus instance
provide-connector-node: "connector*"

frontend:
# Advertise address of frontend
address: "127.0.0.1"
Expand Down Expand Up @@ -940,6 +943,9 @@ template:
# Connector node listen port
port: 50051

# Prometheus exporter listen port
exporter-port: 50052

# Id of this instance
id: connector-${port}

Expand Down
12 changes: 12 additions & 0 deletions src/risedevtool/src/config_gen/prometheus_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ impl PrometheusGen {
.map(|node| format!("\"{}:{}\"", node.address, 9644))
.join(",");

let connector_node_targets = config
.provide_connector_node
.as_ref()
.unwrap()
.iter()
.map(|node| format!("\"{}:{}\"", node.address, node.exporter_port))
.join(",");

let now = Local::now().format("%Y%m%d-%H%M%S");

let remote_write = if config.remote_write {
Expand Down Expand Up @@ -143,6 +151,10 @@ scrape_configs:
- job_name: redpanda
static_configs:
- targets: [{redpanda_targets}]
- job_name: connector-node
static_configs:
- targets: [{connector_node_targets}]
"#,
)
}
Expand Down
2 changes: 2 additions & 0 deletions src/risedevtool/src/service_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ pub struct PrometheusConfig {
pub provide_etcd: Option<Vec<EtcdConfig>>,
pub provide_redpanda: Option<Vec<RedPandaConfig>>,
pub provide_frontend: Option<Vec<FrontendConfig>>,
pub provide_connector_node: Option<Vec<ConnectorNodeConfig>>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -320,6 +321,7 @@ pub struct ConnectorNodeConfig {
phantom_use: Option<String>,
pub id: String,
pub port: u16,
pub exporter_port: u16,
pub address: String,
}

Expand Down

0 comments on commit b3f4fd9

Please sign in to comment.