Skip to content

Commit

Permalink
Stricter checks before transitioning table state
Browse files Browse the repository at this point in the history
Closes apache#4132
Prior to these changes, users were able to manually transition a table from the NEW state to another state. More specifically, calls to TableOperations.online() and TableOperations.offline() when the table was in the NEW state were acceptable. Users should not be able to transition a table from the NEW state as this should only be done by the system upon successfully creating/cloning/importing the table.
Changes:
- Added new param expectedCurrStates to TableManager.transitionTableState()
- Users can now only manually change a table from [ONLINE/OFFLINE]->[ONLINE/OFFLINE] (FateServiceHandler)
- Added field expectedCurrStates to ChangeTableState
- Table clones, creations, and imports now explicitly expect the current state before completion to be NEW (FinishCloneTable, FinishCreateTable, FinishImportTable)
- Table deletions now explicitly expect the current state before deletion to be only ONLINE or OFFLINE (DeleteTable)
  • Loading branch information
kevinrr888 committed Jan 17, 2024
1 parent e67cff2 commit 4d01ec8
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,27 +134,28 @@ public TableState getTableState(TableId tableId) {
return tableStateCache.get(tableId);
}

public synchronized void transitionTableState(final TableId tableId, final TableState newState) {
public synchronized void transitionTableState(final TableId tableId, final TableState newState,
final Set<TableState> expectedCurrStates) {
Preconditions.checkArgument(newState != TableState.UNKNOWN);
String statePath = zkRoot + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE;

try {
zoo.mutateOrCreate(statePath, newState.name().getBytes(UTF_8), oldData -> {
TableState oldState = TableState.UNKNOWN;
if (oldData != null) {
oldState = TableState.valueOf(new String(oldData, UTF_8));
zoo.mutateOrCreate(statePath, newState.name().getBytes(UTF_8), currData -> {
TableState currState = TableState.UNKNOWN;
if (currData != null) {
currState = TableState.valueOf(new String(currData, UTF_8));
}

// this check makes the transition operation idempotent
if (oldState == newState) {
if (currState == newState) {
return null; // already at desired state, so nothing to do
}

boolean transition = true;
// +--------+
// v |
// NEW -> (ONLINE|OFFLINE)+--- DELETING
switch (oldState) {
switch (currState) {
case NEW:
transition = (newState == TableState.OFFLINE || newState == TableState.ONLINE);
break;
Expand All @@ -168,10 +169,10 @@ public synchronized void transitionTableState(final TableId tableId, final Table
transition = false;
break;
}
if (!transition) {
throw new IllegalTableTransitionException(oldState, newState);
if (!transition || !expectedCurrStates.contains(currState)) {
throw new IllegalTableTransitionException(currState, newState);
}
log.debug("Transitioning state for table {} from {} to {}", tableId, oldState, newState);
log.debug("Transitioning state for table {} from {} to {}", tableId, currState, newState);
return newState.name().getBytes(UTF_8);
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.manager.thrift.FateOperation;
import org.apache.accumulo.core.manager.thrift.FateService;
import org.apache.accumulo.core.manager.thrift.ThriftPropertyException;
Expand Down Expand Up @@ -388,9 +389,11 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, long opid, FateOpe
}

goalMessage += "Online table " + tableId;
final Set<TableState> expectedCurrStates = Set.of(TableState.ONLINE, TableState.OFFLINE);
manager.fate().seedTransaction(op.toString(), opid,
new TraceRepo<>(new ChangeTableState(namespaceId, tableId, tableOp)), autoCleanup,
goalMessage);
new TraceRepo<>(
new ChangeTableState(namespaceId, tableId, tableOp, expectedCurrStates)),
autoCleanup, goalMessage);
break;
}
case TABLE_OFFLINE: {
Expand All @@ -413,9 +416,11 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, long opid, FateOpe
}

goalMessage += "Offline table " + tableId;
final Set<TableState> expectedCurrStates = Set.of(TableState.ONLINE, TableState.OFFLINE);
manager.fate().seedTransaction(op.toString(), opid,
new TraceRepo<>(new ChangeTableState(namespaceId, tableId, tableOp)), autoCleanup,
goalMessage);
new TraceRepo<>(
new ChangeTableState(namespaceId, tableId, tableOp, expectedCurrStates)),
autoCleanup, goalMessage);
break;
}
case TABLE_MERGE: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.accumulo.manager.tableOps;

import java.util.Set;

import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
Expand All @@ -32,11 +34,14 @@ public class ChangeTableState extends ManagerRepo {
private TableId tableId;
private NamespaceId namespaceId;
private TableOperation top;
private final Set<TableState> expectedCurrStates;

public ChangeTableState(NamespaceId namespaceId, TableId tableId, TableOperation top) {
public ChangeTableState(NamespaceId namespaceId, TableId tableId, TableOperation top,
Set<TableState> expectedCurrStates) {
this.tableId = tableId;
this.namespaceId = namespaceId;
this.top = top;
this.expectedCurrStates = expectedCurrStates;

if (top != TableOperation.ONLINE && top != TableOperation.OFFLINE) {
throw new IllegalArgumentException(top.toString());
Expand All @@ -58,7 +63,7 @@ public Repo<Manager> call(long tid, Manager env) {
ts = TableState.OFFLINE;
}

env.getTableManager().transitionTableState(tableId, ts);
env.getTableManager().transitionTableState(tableId, ts, expectedCurrStates);
Utils.unreserveNamespace(env, namespaceId, tid, false);
Utils.unreserveTable(env, tableId, tid, true);
LoggerFactory.getLogger(ChangeTableState.class).debug("Changed table state {} {}", tableId, ts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.accumulo.manager.tableOps.clone;

import java.util.Set;

import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.manager.Manager;
Expand Down Expand Up @@ -47,10 +49,13 @@ public Repo<Manager> call(long tid, Manager environment) {
// may never create files.. therefore there is no need to consume namenode space w/ directories
// that are not used... tablet will create directories as needed

final Set<TableState> expectedCurrStates = Set.of(TableState.NEW);
if (cloneInfo.keepOffline) {
environment.getTableManager().transitionTableState(cloneInfo.tableId, TableState.OFFLINE);
environment.getTableManager().transitionTableState(cloneInfo.tableId, TableState.OFFLINE,
expectedCurrStates);
} else {
environment.getTableManager().transitionTableState(cloneInfo.tableId, TableState.ONLINE);
environment.getTableManager().transitionTableState(cloneInfo.tableId, TableState.ONLINE,
expectedCurrStates);
}

Utils.unreserveNamespace(environment, cloneInfo.srcNamespaceId, tid, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.manager.tableOps.create;

import java.io.IOException;
import java.util.Set;

import org.apache.accumulo.core.client.admin.InitialTableState;
import org.apache.accumulo.core.fate.Repo;
Expand Down Expand Up @@ -50,13 +51,14 @@ public long isReady(long tid, Manager environment) {

@Override
public Repo<Manager> call(long tid, Manager env) throws Exception {
final Set<TableState> expectedCurrStates = Set.of(TableState.NEW);

if (tableInfo.getInitialTableState() == InitialTableState.OFFLINE) {
env.getContext().getTableManager().transitionTableState(tableInfo.getTableId(),
TableState.OFFLINE);
TableState.OFFLINE, expectedCurrStates);
} else {
env.getContext().getTableManager().transitionTableState(tableInfo.getTableId(),
TableState.ONLINE);
TableState.ONLINE, expectedCurrStates);
}

Utils.unreserveNamespace(env, tableInfo.getNamespaceId(), tid, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.accumulo.manager.tableOps.delete;

import java.util.Set;

import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
Expand Down Expand Up @@ -47,7 +49,8 @@ public long isReady(long tid, Manager env) throws Exception {

@Override
public Repo<Manager> call(long tid, Manager env) {
env.getTableManager().transitionTableState(tableId, TableState.DELETING);
final Set<TableState> expectedCurrStates = Set.of(TableState.ONLINE, TableState.OFFLINE);
env.getTableManager().transitionTableState(tableId, TableState.DELETING, expectedCurrStates);
env.getEventCoordinator().event("deleting table %s ", tableId);
return new CleanUp(tableId, namespaceId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.accumulo.core.Constants.IMPORT_MAPPINGS_FILE;

import java.util.Set;

import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.manager.Manager;
Expand Down Expand Up @@ -52,8 +54,9 @@ public Repo<Manager> call(long tid, Manager env) throws Exception {
}
}

final Set<TableState> expectedCurrStates = Set.of(TableState.NEW);
final TableState newState = tableInfo.keepOffline ? TableState.OFFLINE : TableState.ONLINE;
env.getTableManager().transitionTableState(tableInfo.tableId, newState);
env.getTableManager().transitionTableState(tableInfo.tableId, newState, expectedCurrStates);

Utils.unreserveNamespace(env, tableInfo.namespaceId, tid, false);
Utils.unreserveTable(env, tableInfo.tableId, tid, true);
Expand Down

0 comments on commit 4d01ec8

Please sign in to comment.