Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into 4.0-feature-4904
Browse files Browse the repository at this point in the history
kevinrr888 committed Nov 21, 2024
2 parents 8e59023 + 5de04df commit dc707e7
Showing 76 changed files with 2,278 additions and 1,186 deletions.
442 changes: 229 additions & 213 deletions assemble/bin/accumulo-cluster

Large diffs are not rendered by default.

103 changes: 79 additions & 24 deletions assemble/bin/accumulo-service
Original file line number Diff line number Diff line change
@@ -58,31 +58,65 @@ function rotate_log() {
fi
}

function get_group() {
# Find the group parameter if any
GROUP_PATTERN="^(compactor.group|sserver.group|tserver.group)=(.*)$"
group="default"
for param in "$@"; do
if [[ $param =~ $GROUP_PATTERN ]]; then
group="${BASH_REMATCH[2]}"
fi
done
echo "${group}"
}

function start_service() {
local service_type=$1
local service_name=$2
shift 2

local pid_file="${ACCUMULO_PID_DIR}/accumulo-${service_name}.pid"
if [[ -f $pid_file ]]; then
pid=$(cat "$pid_file")
if kill -0 "$pid" 2>/dev/null; then
echo "$HOST : ${service_name} already running (${pid})"
exit 0
fi
local build_service_name="false"
if [[ -n $service_name ]]; then
# if service_name is supplied, then we are only starting one instance
servers_per_host=1
else
build_service_name="true"
servers_per_host=${ACCUMULO_CLUSTER_ARG:-1}
fi
echo "Starting $service_name on $HOST"

if [[ ${service_type} == "manager" ]]; then
"${bin}/accumulo" org.apache.accumulo.manager.state.SetGoalState NORMAL
fi
outfile="${ACCUMULO_LOG_DIR}/${service_name}_${HOST}.out"
errfile="${ACCUMULO_LOG_DIR}/${service_name}_${HOST}.err"
rotate_log "$outfile"
rotate_log "$errfile"
group=$(get_group "$@")

nohup "${bin}/accumulo" "$service_type" "$@" >"$outfile" 2>"$errfile" </dev/null &
echo "$!" >"${pid_file}"
for ((process_num = 1; process_num <= servers_per_host; process_num++)); do
if [[ ${build_service_name} == "true" ]]; then
service_name="${service_type}_${group}_${process_num}"
fi
# The ACCUMULO_SERVICE_INSTANCE variable is used in
# accumulo-env.sh to set parameters on the command
# line.
export ACCUMULO_SERVICE_INSTANCE="${service_name}"

local pid_file="${ACCUMULO_PID_DIR}/accumulo-${service_name}.pid"
if [[ -f $pid_file ]]; then
pid=$(cat "$pid_file")
if kill -0 "$pid" 2>/dev/null; then
echo "$HOST : ${service_name} already running (${pid})"
exit 0
fi
fi
echo "Starting $service_name on $HOST"

if [[ ${service_type} == "manager" ]]; then
"${bin}/accumulo" org.apache.accumulo.manager.state.SetGoalState NORMAL
fi
outfile="${ACCUMULO_LOG_DIR}/${service_name}_${HOST}.out"
errfile="${ACCUMULO_LOG_DIR}/${service_name}_${HOST}.err"
rotate_log "$outfile"
rotate_log "$errfile"

nohup "${bin}/accumulo" "$service_type" "$@" >"$outfile" 2>"$errfile" </dev/null &
echo "$!" >"${pid_file}"

done

# Check the max open files limit and selectively warn
max_files_open=$(ulimit -n)
@@ -125,6 +159,18 @@ function stop_service() {
local pid_file="${ACCUMULO_PID_DIR}/accumulo-${process}.pid"
control_process "TERM" "$process" "$pid_file"
done
elif [[ -n $ACCUMULO_CLUSTER_ARG ]]; then

servers_per_host=${ACCUMULO_CLUSTER_ARG:-1}
group=$(get_group "$@")

for ((process_num = 1; process_num <= servers_per_host; process_num++)); do
service_name="${service_type}_${group}_${process_num}"
echo "Stopping service process: $service_name"
local pid_file="${ACCUMULO_PID_DIR}/accumulo-${service_name}.pid"
control_process "TERM" "$service_name" "$pid_file"
done

else
echo "Stopping service process: $service_name"
local pid_file="${ACCUMULO_PID_DIR}/accumulo-${service_name}.pid"
@@ -142,6 +188,18 @@ function kill_service() {
local pid_file="${ACCUMULO_PID_DIR}/accumulo-${process}.pid"
control_process "KILL" "$process" "$pid_file"
done
elif [[ -n $ACCUMULO_CLUSTER_ARG ]]; then

servers_per_host=${ACCUMULO_CLUSTER_ARG:-1}
group=$(get_group "$@")

for ((process_num = 1; process_num <= servers_per_host; process_num++)); do
service_name="${service_type}_${group}_${process_num}"
echo "Stopping service process: $service_name"
local pid_file="${ACCUMULO_PID_DIR}/accumulo-${service_name}.pid"
control_process "KILL" "$service_name" "$pid_file"
done

else
local pid_file="${ACCUMULO_PID_DIR}/accumulo-${service_name}.pid"
control_process "KILL" "$service_name" "$pid_file"
@@ -198,11 +256,11 @@ function main() {
local service_type="$1"
local command_name="$2"
shift 2
local service_name=$service_type
local service_name=""
local all_flag=false

# Check and see if accumulo-cluster is calling this script
if [[ -z $ACCUMULO_SERVICE_INSTANCE ]]; then
if [[ -z $ACCUMULO_CLUSTER_ARG ]]; then
# The rest of the arguments are from a user
if [[ $1 == "--all" ]]; then
all_flag=true
@@ -212,9 +270,6 @@ function main() {
service_name="$1"
fi
fi
# Use the special bash env var from accumulo-cluster
else
service_name="${service_type}${ACCUMULO_SERVICE_INSTANCE}"
fi

case "$service_type" in
@@ -227,10 +282,10 @@ function main() {
start_service "$service_type" "$service_name" "$@"
;;
stop)
stop_service "$service_type" "$service_name" $all_flag
stop_service "$service_type" "$service_name" $all_flag "$@"
;;
kill)
kill_service "$service_type" "$service_name" $all_flag
kill_service "$service_type" "$service_name" $all_flag "$@"
;;
list)
list_processes "$service_type"
6 changes: 3 additions & 3 deletions assemble/conf/accumulo-env.sh
Original file line number Diff line number Diff line change
@@ -101,10 +101,10 @@ esac

## JVM options set for logging. Review log4j2.properties file to see how they are used.
JAVA_OPTS=("-Daccumulo.log.dir=${ACCUMULO_LOG_DIR}"
"-Daccumulo.application=${cmd}${ACCUMULO_SERVICE_INSTANCE}_$(hostname)"
"-Daccumulo.metrics.service.instance=${cmd}${ACCUMULO_SERVICE_INSTANCE}"
"-Daccumulo.application=${ACCUMULO_SERVICE_INSTANCE}_$(hostname)"
"-Daccumulo.metrics.service.instance=${ACCUMULO_SERVICE_INSTANCE}"
"-Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"
"-Dotel.service.name=${cmd}${ACCUMULO_SERVICE_INSTANCE}"
"-Dotel.service.name=${ACCUMULO_SERVICE_INSTANCE}"
"${JAVA_OPTS[@]}"
)

1 change: 0 additions & 1 deletion core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
@@ -47,7 +47,6 @@ public class Constants {
public static final String ZTABLE_NAMESPACE = "/namespace";

public static final String ZNAMESPACES = "/namespaces";
public static final String ZNAMESPACE_NAME = "/name";

public static final String ZMANAGERS = "/managers";
public static final String ZMANAGER_LOCK = ZMANAGERS + "/lock";
Original file line number Diff line number Diff line change
@@ -231,8 +231,8 @@ Map<String,String> modifyProperties(Consumer<Map<String,String>> mapMutator)
ServerId getServer(ServerId.Type type, String resourceGroup, String host, int port);

/**
* Returns all servers of the given types. For the Manager, the result will contain only one
* element for the current active Manager.
* Returns all servers of the given types. For the Manager, Monitor, and Garbage Collector, the
* result will contain only one element for the current active process.
*
* @return set of servers of the supplied type
* @since 4.0.0
@@ -243,8 +243,7 @@ Map<String,String> modifyProperties(Consumer<Map<String,String>> mapMutator)
* Returns the servers of a given type that match the given criteria
*
* @param resourceGroupPredicate only returns servers where the resource group matches this
* predicate. For the manager it does not have a resoruce group and this parameters is not
* used.
* predicate. For the Manager, Monitor, and Garbage Collector, this parameter is not used.
* @param hostPortPredicate only returns servers where its host and port match this predicate.
* @return set of servers of the supplied type matching the supplied test
* @since 4.0.0
Original file line number Diff line number Diff line change
@@ -32,13 +32,12 @@
public final class ServerId implements Comparable<ServerId> {

/**
* Server process type names that a client can be expected to interact with. Clients are not
* expected to interact directly with the GarbageCollector or Monitor processes.
* Server process type names.
*
* @since 4.0.0
*/
public enum Type {
MANAGER, COMPACTOR, SCAN_SERVER, TABLET_SERVER;
MANAGER, MONITOR, GARBAGE_COLLECTOR, COMPACTOR, SCAN_SERVER, TABLET_SERVER;
}

private final Type type;
Original file line number Diff line number Diff line change
@@ -144,6 +144,7 @@ public class ClientContext implements AccumuloClient {
private final Supplier<SslConnectionParams> sslSupplier;
private final Supplier<ScanServerSelector> scanServerSelectorSupplier;
private final Supplier<ServiceLockPaths> serverPaths;
private final NamespaceMapping namespaces;
private TCredentials rpcCreds;
private ThriftTransportPool thriftTransportPool;
private ZookeeperLockChecker zkLockChecker;
@@ -254,6 +255,7 @@ public ClientContext(SingletonReservation reservation, ClientInfo info,
clientThreadPools = ThreadPools.getClientThreadPools(ueh);
}
}
this.namespaces = new NamespaceMapping(this);
}

public Ample getAmple() {
@@ -1076,4 +1078,8 @@ public ServiceLockPaths getServerPaths() {
return this.serverPaths.get();
}

public NamespaceMapping getNamespaces() {
return namespaces;
}

}
Original file line number Diff line number Diff line change
@@ -495,11 +495,13 @@ public ServerId getServer(ServerId.Type type, String resourceGroup, String host,
throw new IllegalStateException("Multiple servers matching provided address");
}
case MANAGER:
Set<ServerId> managers = getServers(type, rg2 -> true, hp);
if (managers.isEmpty()) {
case MONITOR:
case GARBAGE_COLLECTOR:
Set<ServerId> server = getServers(type, rg2 -> true, hp);
if (server.isEmpty()) {
return null;
} else {
return managers.iterator().next();
return server.iterator().next();
}
case SCAN_SERVER:
Set<ServiceLockPath> sservers = context.getServerPaths().getScanServer(rg, hp, true);
@@ -511,7 +513,7 @@ public ServerId getServer(ServerId.Type type, String resourceGroup, String host,
throw new IllegalStateException("Multiple servers matching provided address");
}
case TABLET_SERVER:
Set<ServiceLockPath> tservers = context.getServerPaths().getScanServer(rg, hp, true);
Set<ServiceLockPath> tservers = context.getServerPaths().getTabletServer(rg, hp, true);
if (tservers.isEmpty()) {
return null;
} else if (tservers.size() == 1) {
@@ -569,6 +571,36 @@ private Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceG
}
}
break;
case MONITOR:
ServiceLockPath mon = context.getServerPaths().getMonitor(true);
if (mon != null) {
Optional<ServiceLockData> sld = context.getZooCache().getLockData(mon);
String location = null;
if (sld.isPresent()) {
location = sld.orElseThrow().getAddressString(ThriftService.NONE);
if (addressSelector.getPredicate().test(location)) {
HostAndPort hp = HostAndPort.fromString(location);
results.add(new ServerId(type, Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(),
hp.getPort()));
}
}
}
break;
case GARBAGE_COLLECTOR:
ServiceLockPath gc = context.getServerPaths().getGarbageCollector(true);
if (gc != null) {
Optional<ServiceLockData> sld = context.getZooCache().getLockData(gc);
String location = null;
if (sld.isPresent()) {
location = sld.orElseThrow().getAddressString(ThriftService.GC);
if (addressSelector.getPredicate().test(location)) {
HostAndPort hp = HostAndPort.fromString(location);
results.add(new ServerId(type, Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(),
hp.getPort()));
}
}
}
break;
case SCAN_SERVER:
context.getServerPaths().getScanServer(resourceGroupPredicate::test, addressSelector, true)
.forEach(s -> results.add(createServerId(type, s)));
Loading

0 comments on commit dc707e7

Please sign in to comment.