Skip to content

Commit

Permalink
Leader leases
Browse files Browse the repository at this point in the history
Summary: Leader leases: a way to ensure that no two servers in the same Raft group act as leaders at any given time. Every leader tries to replicate a lease extension with its consensus update sent to followers. A majority-replicated replicated lease takes effect and will be honored by any future leader. The old leader has to stop serving reads after its lease expires, if it cannot renew it. A new leader has to wait out the old leader's lease before serving reads or acknowledging writes. We work around clock skew issues by never sending timestamps from one server to another in this mechanism, only time intervals. We could improve this mechanism after a clock skew tracking subsystem is implemented.

Test Plan: Jenkins

Reviewers: robert, bharat, bogdan, sergei

Reviewed By: sergei

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D2109
  • Loading branch information
mbautin committed Sep 8, 2017
1 parent 92abbdf commit ad2e046
Show file tree
Hide file tree
Showing 35 changed files with 746 additions and 185 deletions.
8 changes: 6 additions & 2 deletions java/yb-client/src/main/java/org/yb/client/TabletClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -479,15 +479,19 @@ private Exception dispatchTSErrorOrReturnException(YRpc rpc,
ybClient.handleTabletNotFound(rpc, ex, this);
// we're not calling rpc.callback() so we rely on the client to retry that RPC
} else if (code == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE ||
code == WireProtocol.AppStatusPB.ErrorCode.LEADER_NOT_READY_TO_SERVE ||
error.getCode() ==
Tserver.TabletServerErrorPB.Code.LEADER_NOT_READY_CHANGE_CONFIG ||
error.getCode() ==
Tserver.TabletServerErrorPB.Code.LEADER_NOT_READY_TO_STEP_DOWN ||
error.getCode() ==
Tserver.TabletServerErrorPB.Code.LEADER_NOT_READY_TO_SERVE) {
ybClient.handleRetryableError(rpc, ex, this);
// The following error codes are an indication that the tablet isn't a leader.
} else if (code == WireProtocol.AppStatusPB.ErrorCode.ILLEGAL_STATE ||
// The following error codes are an indication that the tablet isn't a leader, or, in case
// of LEADER_HAS_NO_LEASE, might no longer be the leader due to failing to replicate a leader
// lease, so we retry looking up the leader anyway.
} else if (code == WireProtocol.AppStatusPB.ErrorCode.LEADER_HAS_NO_LEASE ||
code == WireProtocol.AppStatusPB.ErrorCode.ILLEGAL_STATE ||
code == WireProtocol.AppStatusPB.ErrorCode.ABORTED ||
error.getCode() == Tserver.TabletServerErrorPB.Code.NOT_THE_LEADER) {
ybClient.handleNotLeader(rpc, ex, this);
Expand Down
9 changes: 9 additions & 0 deletions src/yb/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,15 @@ void GetTableSchemaRpc::SendRpcCb(const Status& status) {
return;
}
}
if (resp_.error().status().code() == AppStatusPB::LEADER_NOT_READY_TO_SERVE ||
resp_.error().status().code() == AppStatusPB::LEADER_HAS_NO_LEASE) {
LOG(WARNING) << "Leader Master " << client_->data_->leader_master_hostport().ToString()
<< " does not have a valid exclusive lease: "
<< resp_.error().status().ShortDebugString() << ", re-trying...";
ResetLeaderMasterAndRetry();
return;
}
LOG(INFO) << "DEBUG: resp_.error().status()=" << resp_.error().status().DebugString();
new_status = StatusFromPB(resp_.error().status());
}

Expand Down
9 changes: 6 additions & 3 deletions src/yb/client/tablet_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,12 @@ bool TabletInvoker::Done(Status* status) {
//
// TODO: IllegalState is obviously way too broad an error category for
// this case.
if (status->IsIllegalState() || status->IsServiceUnavailable() || status->IsAborted()) {
const bool leader_is_not_ready = ErrorCode(rpc_->response_error()) ==
tserver::TabletServerErrorPB::LEADER_NOT_READY_TO_SERVE;
if (status->IsIllegalState() || status->IsServiceUnavailable() || status->IsAborted() ||
status->IsLeaderNotReadyToServe() || status->IsLeaderHasNoLease()) {
const bool leader_is_not_ready =
ErrorCode(rpc_->response_error()) ==
tserver::TabletServerErrorPB::LEADER_NOT_READY_TO_SERVE ||
status->IsLeaderNotReadyToServe();

// If the leader just is not ready - let's retry the same tserver.
// Else the leader became a follower and must be reset on retry.
Expand Down
8 changes: 8 additions & 0 deletions src/yb/common/wire_protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ void StatusToPB(const Status& status, AppStatusPB* pb) {
pb->set_code(AppStatusPB::INTERNAL_ERROR);
} else if (status.IsExpired()) {
pb->set_code(AppStatusPB::EXPIRED);
} else if (status.IsLeaderHasNoLease()) {
pb->set_code(AppStatusPB::LEADER_HAS_NO_LEASE);
} else if (status.IsLeaderNotReadyToServe()) {
pb->set_code(AppStatusPB::LEADER_NOT_READY_TO_SERVE);
} else {
LOG(WARNING) << "Unknown error code translation from internal error "
<< status.ToString() << ": sending UNKNOWN_ERROR";
Expand Down Expand Up @@ -180,6 +184,10 @@ Status StatusFromPB(const AppStatusPB& pb) {
return STATUS(InternalError, pb.message(), "", posix_code);
case AppStatusPB::EXPIRED:
return STATUS(Expired, pb.message(), "", posix_code);
case AppStatusPB::LEADER_HAS_NO_LEASE:
return STATUS(LeaderHasNoLease, pb.message(), "", posix_code);
case AppStatusPB::LEADER_NOT_READY_TO_SERVE:
return STATUS(LeaderNotReadyToServe, pb.message(), "", posix_code);
case AppStatusPB::UNKNOWN_ERROR:
default:
LOG(WARNING) << "Unknown error code in status: " << pb.ShortDebugString();
Expand Down
2 changes: 2 additions & 0 deletions src/yb/common/wire_protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ message AppStatusPB {
SQL_ERROR = 20;
INTERNAL_ERROR = 21;
EXPIRED = 22;
LEADER_NOT_READY_TO_SERVE = 23;
LEADER_HAS_NO_LEASE = 24;
}

required ErrorCode code = 1;
Expand Down
1 change: 1 addition & 0 deletions src/yb/consensus/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ set(CONSENSUS_SRCS

add_library(consensus ${CONSENSUS_SRCS})
cotire(consensus)

target_link_libraries(consensus
consensus_proto
yb_common
Expand Down
1 change: 1 addition & 0 deletions src/yb/consensus/consensus-test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,7 @@ class TestRaftConsensusQueueIface : public PeerMessageQueueObserver {

protected:
virtual void UpdateMajorityReplicated(const OpId& majority_replicated,
MonoTime majority_replicated_leader_lease_expiration,
OpId* committed_index) override {
std::lock_guard<simple_spinlock> lock(lock_);
majority_replicated_index_ = majority_replicated.index();
Expand Down
33 changes: 22 additions & 11 deletions src/yb/consensus/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ class Consensus : public RefCountedThreadSafe<Consensus> {
// commit index, which tells them to apply the operation.
//
// This method can only be called on the leader, i.e. role() == LEADER

virtual CHECKED_STATUS Replicate(const ConsensusRoundPtr& round) = 0;

// A batch version of Replicate, which is what we try to use as much as possible for performance.
Expand Down Expand Up @@ -286,12 +287,12 @@ class Consensus : public RefCountedThreadSafe<Consensus> {
// Messages sent from CANDIDATEs to voting peers to request their vote
// in leader election.
virtual CHECKED_STATUS RequestVote(const VoteRequestPB* request,
VoteResponsePB* response) = 0;
VoteResponsePB* response) = 0;

// Implement a ChangeConfig() request.
virtual CHECKED_STATUS ChangeConfig(const ChangeConfigRequestPB& req,
const StatusCallback& client_cb,
boost::optional<tserver::TabletServerErrorPB::Code>* error) {
const StatusCallback& client_cb,
boost::optional<tserver::TabletServerErrorPB::Code>* error) {
return STATUS(NotSupported, "Not implemented.");
}

Expand All @@ -307,12 +308,17 @@ class Consensus : public RefCountedThreadSafe<Consensus> {
// Returns the id of the tablet whose updates this consensus instance helps coordinate.
virtual std::string tablet_id() const = 0;

// Returns a copy of the committed state of the Consensus system.
virtual ConsensusStatePB ConsensusState(ConsensusConfigType type) const = 0;
// Returns a copy of the committed state of the Consensus system. Also allows returning the
// leader lease status captured under the same lock.
virtual ConsensusStatePB ConsensusState(
ConsensusConfigType type,
LeaderLeaseStatus* leader_lease_status = nullptr) const = 0;

// Returns a copy of the committed state of the Consensus system,
// assuming caller holds the needed locks
virtual ConsensusStatePB ConsensusStateUnlocked(ConsensusConfigType type) const = 0;
// Returns a copy of the committed state of the Consensus system, assuming caller holds the needed
// locks.
virtual ConsensusStatePB ConsensusStateUnlocked(
ConsensusConfigType type,
LeaderLeaseStatus* leader_lease_status = nullptr) const = 0;

// Returns a copy of the current committed Raft configuration.
virtual RaftConfigPB CommittedConfig() const = 0;
Expand All @@ -326,13 +332,18 @@ class Consensus : public RefCountedThreadSafe<Consensus> {
// Stops running the consensus algorithm.
virtual void Shutdown() = 0;

// Returns the last OpId (either received or committed, depending on the
// 'type' argument) that the Consensus implementation knows about.
// Primarily used for testing purposes.
// Returns the last OpId (either received or committed, depending on the 'type' argument) that the
// Consensus implementation knows about. Primarily used for testing purposes.
virtual CHECKED_STATUS GetLastOpId(OpIdType type, OpId* id) {
return STATUS(NotFound, "Not implemented.");
}

// Assuming we are the leader, wait until we have a valid leader lease (i.e. the old leader's
// lease has expired, and we have replicated a new lease that has not expired yet).
virtual Status WaitForLeaderLeaseImprecise(MonoTime deadline) = 0;

virtual Status CheckIsActiveLeaderAndHasLease() const = 0;

protected:
friend class RefCountedThreadSafe<Consensus>;
friend class tablet::TabletPeer;
Expand Down
38 changes: 33 additions & 5 deletions src/yb/consensus/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
//
//
// The following only applies to changes made to this file as part of YugaByte development.
//
// Portions Copyright (c) YugaByte, Inc.
Expand All @@ -29,6 +30,7 @@
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

package yb.consensus;

option java_package = "org.yb.consensus";
Expand Down Expand Up @@ -176,22 +178,26 @@ message ChangeConfigResponsePB {
optional fixed64 hybrid_time = 2;
}

// A Replicate message, sent to replicas by leader to indicate this operation must
// be stored in the WAL/SM log, as part of the first phase of the two phase
// commit.
// A Replicate message, sent to replicas by leader to indicate this operation must be stored in the
// write-ahead log.
message ReplicateMsg {
// The Raft operation ID (term and index) being replicated.

required OpIdPB id = 1;
// The (hybrid or logical) hybrid_time assigned to this message.

// The hybrid time assigned to this message.
required fixed64 hybrid_time = 2;

// A counter that is forever increasing in a tablet (like hybrid time). Used for list indexing.
optional int64 monotonic_counter = 9;

// optional ExternalConsistencyMode external_consistency_mode = 3 [default = NO_CONSISTENCY];
required OperationType op_type = 4;
optional tserver.WriteRequestPB write_request = 5;
optional tserver.AlterSchemaRequestPB alter_schema_request = 6;
optional tserver.TransactionStatePB transaction_state = 10;
optional ChangeConfigRecordPB change_config_record = 7;

// The Raft operation ID known to the leader to be committed at the time this message was sent.
// This is used during tablet bootstrap for RocksDB-backed tables.
optional OpIdPB committed_op_id = 8;
Expand Down Expand Up @@ -307,6 +313,10 @@ message VoteResponsePB {
// True if this peer voted for the caller, false otherwise.
optional bool vote_granted = 3;

// An upper bound on the remainder of the old leader's lease that the new leader has to wait out
// before it can start serving read and write requests.
optional int32 remaining_leader_lease_duration_ms = 4;

// TODO: Migrate ConsensusService to the AppStatusPB RPC style and merge these errors.
// Error message from the consensus implementation.
optional ConsensusErrorPB consensus_error = 998;
Expand Down Expand Up @@ -353,6 +363,13 @@ message ConsensusRequestPB {
// these operations are already committed, in which case they will be
// committed during the same request.
repeated ReplicateMsg ops = 6;

// Leader lease duration in milliseconds. A leader is not allowed to serve up-to-date reads
// until it is able to replicate a lease extension. A new leader cannot assume its
// responsibilities until this amount time has definitely passed since the old leader sent the
// consensus request. Due to potential clock skew, we are not sending a timestamp, but an amount
// of time followers have to wait.
optional int32 leader_lease_duration_ms = 8;
}

message ConsensusResponsePB {
Expand Down Expand Up @@ -451,6 +468,12 @@ message LeaderStepDownResponsePB {
optional tserver.TabletServerErrorPB error = 1;
}

enum LeaderLeaseStatus {
HAS_LEASE = 0;
OLD_LEADER_MAY_HAVE_LEASE = 1;
NO_MAJORITY_REPLICATED_LEASE = 2;
}

enum OpIdType {
UNKNOWN_OPID_TYPE = 0;
RECEIVED_OPID = 1;
Expand Down Expand Up @@ -487,8 +510,13 @@ message GetConsensusStateRequestPB {

message GetConsensusStateResponsePB {
optional ConsensusStatePB cstate = 1;
// A generic error message (such as tablet not found).

// A generic error message (such as tablet not found).
optional tserver.TabletServerErrorPB error = 2;

// Allows returning the leader lease status in the same RPC. Useful for waiting for the leader
// to be allowed to serve requests.
optional LeaderLeaseStatus leader_lease_status = 3;
}

message StartRemoteBootstrapRequestPB {
Expand Down
6 changes: 4 additions & 2 deletions src/yb/consensus/consensus_queue-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
page_size_estimator.set_caller_term(14);
OpId* committed_index = page_size_estimator.mutable_committed_index();
OpId* preceding_id = page_size_estimator.mutable_preceding_id();
// The actual leader lease duration does not matter here, we just want it to be set.
page_size_estimator.set_leader_lease_duration_ms(2000);
committed_index->CopyFrom(MinimumOpId());
preceding_id->CopyFrom(MinimumOpId());

Expand Down Expand Up @@ -651,7 +653,7 @@ TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MakeOpId(1, 10));
// Now rewrite some of the operations and wait for the log to append.
Synchronizer synch;
CHECK_OK(queue_->AppendOperations(
ASSERT_OK(queue_->AppendOperations(
{ CreateDummyReplicate(2, 5, clock_->Now(), 0) },
synch.AsStatusCallback()));

Expand All @@ -661,7 +663,7 @@ TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
// Without the fix the following append would trigger a check failure
// in log cache.
synch.Reset();
CHECK_OK(queue_->AppendOperations(
ASSERT_OK(queue_->AppendOperations(
{ CreateDummyReplicate(2, 6, clock_->Now(), 0) },
synch.AsStatusCallback()));

Expand Down
Loading

0 comments on commit ad2e046

Please sign in to comment.