Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to official doppelganger detection implementation. #4381

Merged
merged 1 commit into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion beacon_chain/rpc/rest_validator_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

import std/[typetraits, sets]
import std/[typetraits, sets, sequtils]
import stew/[results, base10], chronicles
import ".."/[beacon_chain_db, beacon_node],
".."/networking/eth2_network,
Expand Down Expand Up @@ -853,3 +853,66 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
signedValidatorRegistration

return RestApiResponse.response("", Http200, "text/plain")

router.api(MethodPost, "/eth/v1/validator/liveness/{epoch}") do (
epoch: Epoch, contentBody: Option[ContentBody]) -> RestApiResponse:
let
qepoch =
block:
if epoch.isErr():
return RestApiResponse.jsonError(Http400, InvalidEpochValueError,
$epoch.error())
let
res = epoch.get()
wallEpoch = node.currentSlot().epoch()
nextEpoch =
if wallEpoch == FAR_FUTURE_EPOCH:
wallEpoch
else:
wallEpoch + 1
prevEpoch = get_previous_epoch(wallEpoch)
if (res < prevEpoch) or (res > nextEpoch):
return RestApiResponse.jsonError(Http400, InvalidEpochValueError,
"Requested epoch is more than one epoch from current epoch")
res
indexList =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reading the index list should really be a function, given how we read the same list in several places (attestations etc)

block:
if contentBody.isNone():
return RestApiResponse.jsonError(Http400, EmptyRequestBodyError)
let dres = decodeBody(seq[RestValidatorIndex], contentBody.get())
if dres.isErr():
return RestApiResponse.jsonError(Http400,
InvalidValidatorIndexValueError,
$dres.error())
var
res: seq[ValidatorIndex]
dupset: HashSet[ValidatorIndex]

let items = dres.get()
for item in items:
let vres = item.toValidatorIndex()
if vres.isErr():
case vres.error()
of ValidatorIndexError.TooHighValue:
return RestApiResponse.jsonError(Http400,
TooHighValidatorIndexValueError)
of ValidatorIndexError.UnsupportedValue:
return RestApiResponse.jsonError(Http500,
UnsupportedValidatorIndexValueError)
let index = vres.get()
if index in dupset:
return RestApiResponse.jsonError(Http400,
DuplicateValidatorIndexArrayError)
dupset.incl(index)
res.add(index)
if len(res) == 0:
return RestApiResponse.jsonError(Http400,
EmptyValidatorIndexArrayError)
res
response = indexList.mapIt(
RestLivenessItem(
index: it,
is_live: node.attestationPool[].validatorSeenAtEpoch(qepoch, it)
)
)
return RestApiResponse.jsonResponse(response)
37 changes: 37 additions & 0 deletions beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2364,6 +2364,43 @@ proc readValue*(reader: var JsonReader[RestJson],
value = RestActivityItem(index: index.get(), epoch: epoch.get(),
active: active.get())

## RestLivenessItem
proc writeValue*(writer: var JsonWriter[RestJson],
value: RestLivenessItem) {.
raises: [IOError, Defect].} =
writer.beginRecord()
writer.writeField("index", value.index)
writer.writeField("is_live", value.is_live)
writer.endRecord()

proc readValue*(reader: var JsonReader[RestJson],
value: var RestLivenessItem) {.
raises: [SerializationError, IOError, Defect].} =
var index: Option[ValidatorIndex]
var isLive: Option[bool]

for fieldName in readObjectFields(reader):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need a custom reader? isn't it simple enough?

case fieldName
of "index":
if index.isSome():
reader.raiseUnexpectedField(
"Multiple `index` fields found", "RestLivenessItem")
index = some(reader.readValue(ValidatorIndex))
of "is_live":
if isLive.isSome():
reader.raiseUnexpectedField(
"Multiple `is_live` fields found", "RestLivenessItem")
isLive = some(reader.readValue(bool))
else:
discard

if index.isNone():
reader.raiseUnexpectedValue("Missing or empty `index` value")
if isLive.isNone():
reader.raiseUnexpectedValue("Missing or empty `is_live` value")

value = RestLivenessItem(index: index.get(), is_live: isLive.get())

## HeadChangeInfoObject
proc writeValue*(writer: var JsonWriter[RestJson],
value: HeadChangeInfoObject) {.
Expand Down
5 changes: 5 additions & 0 deletions beacon_chain/spec/eth2_apis/rest_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ type
epoch*: Epoch
active*: bool

RestLivenessItem* = object
index*: ValidatorIndex
is_live*: bool

PrepareBeaconProposer* = object
validator_index*: ValidatorIndex
fee_recipient*: Eth1Address
Expand Down Expand Up @@ -608,6 +612,7 @@ type
ProduceSyncCommitteeContributionResponse* = DataEnclosedObject[SyncCommitteeContribution]
SubmitBlindedBlockResponse* = DataEnclosedObject[bellatrix.ExecutionPayload]
GetValidatorsActivityResponse* = DataEnclosedObject[seq[RestActivityItem]]
GetValidatorsLivenessResponse* = DataEnclosedObject[seq[RestLivenessItem]]

func `==`*(a, b: RestValidatorIndex): bool =
uint64(a) == uint64(b)
Expand Down
6 changes: 6 additions & 0 deletions beacon_chain/spec/eth2_apis/rest_validator_calls.nim
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,9 @@ proc registerValidator*(body: seq[SignedValidatorRegistrationV1]): RestPlainResp
rest, endpoint: "/eth/v1/validator/register_validator",
meth: MethodPost.}
## https://ethereum.github.io/beacon-APIs/#/Validator/registerValidator

proc getValidatorsLiveness*(epoch: Epoch,
body: seq[ValidatorIndex]
): RestPlainResponse {.
rest, endpoint: "/eth/v1/validator/liveness/{epoch}",
meth: MethodPost.}
104 changes: 104 additions & 0 deletions beacon_chain/validator_client/api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2232,3 +2232,107 @@ proc registerValidator*(
status = response.status, endpoint = apiResponse.node,
message = response.getErrorMessage()
return count

proc getValidatorsLiveness*(
vc: ValidatorClientRef, epoch: Epoch,
validators: seq[ValidatorIndex]
): Future[GetValidatorsLivenessResponse] {.async.} =
logScope: request = "getValidatorsActivity"
let resp = vc.onceToAll(RestPlainResponse, SlotDuration,
{BeaconNodeRole.Duties},
getValidatorsLiveness(it, epoch, validators))
case resp.status
of ApiOperation.Timeout:
debug "Unable to perform validator's liveness request in time",
timeout = SlotDuration
return GetValidatorsLivenessResponse()
of ApiOperation.Interrupt:
debug "Validator's liveness request was interrupted"
return GetValidatorsLivenessResponse()
of ApiOperation.Failure:
debug "Unexpected error happened while receiving validator's liveness"
return GetValidatorsLivenessResponse()
of ApiOperation.Success:
let defaultLiveness = RestLivenessItem(index: ValidatorIndex(high(uint32)))
var activities: Table[ValidatorIndex, RestLivenessItem]
for apiResponse in resp.data:
if apiResponse.data.isErr():
debug "Unable to retrieve validators liveness data",
endpoint = apiResponse.node, error = apiResponse.data.error()
else:
let response = apiResponse.data.get()
case response.status
of 200:
let res = decodeBytes(GetValidatorsLivenessResponse,
response.data, response.contentType)
if res.isOk():
let list = res.get().data
if len(list) != len(validators):
debug "Received incomplete validators liveness response",
endpoint = apiResponse.node,
validators_count = len(validators),
activities_count = len(list)
continue
else:
var updated = 0
for item in list:
activities.withValue(item.index, stored):
if item.is_live:
stored[].is_live = true
inc(updated)
do:
activities[item.index] = item
inc(updated)
debug "Received validators liveness response",
endpoint = apiResponse.node,
validators_count = len(validators),
activities_count = len(list),
updated_count = updated
else:
debug "Received invalid/incomplete response",
endpoint = apiResponse.node, error_message = res.error()
apiResponse.node.status = RestBeaconNodeStatus.Incompatible
continue
of 400:
debug "Server reports invalid request",
response_code = response.status,
endpoint = apiResponse.node,
response_error = response.getErrorMessage()
apiResponse.node.status = RestBeaconNodeStatus.Incompatible
continue
of 500:
debug "Server reports internal error",
response_code = response.status,
endpoint = apiResponse.node,
response_error = response.getErrorMessage()
apiResponse.node.status = RestBeaconNodeStatus.Offline
continue
of 503:
debug "Server reports that it not in sync",
response_code = response.status,
endpoint = apiResponse.node,
response_error = response.getErrorMessage()
apiResponse.node.status = RestBeaconNodeStatus.NotSynced
continue
else:
debug "Server reports unexpected error code",
response_code = response.status,
endpoint = apiResponse.node,
response_error = response.getErrorMessage()
apiResponse.node.status = RestBeaconNodeStatus.Offline
continue

var response =
block:
var res: seq[RestLivenessItem]
for vindex in validators:
let item = activities.getOrDefault(vindex, defaultLiveness)
if item == defaultLiveness:
debug "Validator is missing in response",
validator_index = vindex
return GetValidatorsLivenessResponse()
else:
res.add(item)
res

return GetValidatorsLivenessResponse(data: response)
10 changes: 5 additions & 5 deletions beacon_chain/validator_client/doppelganger_service.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ proc waitForNextEpoch(service: DoppelgangerServiceRef) {.async.} =
await sleepAsync(sleepTime)

proc processActivities(service: DoppelgangerServiceRef, epoch: Epoch,
activities: GetValidatorsActivityResponse) =
activities: GetValidatorsLivenessResponse) =
let vc = service.client
if len(activities.data) == 0:
debug "Unable to monitor validator's activity for epoch", epoch = epoch
Expand All @@ -39,10 +39,10 @@ proc processActivities(service: DoppelgangerServiceRef, epoch: Epoch,
value.epochsCount = 0'u64
value.lastAttempt = DoppelgangerAttempt.Failure
else:
for activity in activities.data:
let vindex = activity.index
for item in activities.data:
let vindex = item.index
vc.doppelgangerDetection.validators.withValue(vindex, value):
if activity.active:
if item.is_live:
if value.status == DoppelgangerStatus.Checking:
value.epochsCount = 0'u64
value.lastAttempt = DoppelgangerAttempt.SuccessTrue
Expand Down Expand Up @@ -86,7 +86,7 @@ proc mainLoop(service: DoppelgangerServiceRef) {.async.} =
currentEpoch - 1'u64
validators = vc.getCheckingList()
if len(validators) > 0:
let activities = await vc.getValidatorsActivity(previousEpoch,
let activities = await vc.getValidatorsLiveness(previousEpoch,
validators)
service.processActivities(previousEpoch, activities)
else:
Expand Down