Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: better store logs #3103

Merged
merged 3 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions waku/common/databases/db_postgres/dbconn.nim
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ proc dbConnQuery*(
return err("error in dbConnQuery calling sendQuery: " & $error)

let sendDuration = getTime().toUnixFloat() - queryStartTime
query_time_secs.set(sendDuration, [querySummary, "sendQuery"])
query_time_secs.set(sendDuration, [querySummary, "sendToDBQuery"])

queryStartTime = getTime().toUnixFloat()

Expand All @@ -246,9 +246,10 @@ proc dbConnQuery*(
debug "dbConnQuery",
requestId,
query = $query,
args,
querySummary,
waitDurationSecs = waitDuration,
sendDurationSecs = sendDuration
waitDbQueryDurationSecs = waitDuration,
sendToDBDurationSecs = sendDuration

return ok()

Expand All @@ -270,7 +271,7 @@ proc dbConnQueryPrepared*(
return err("error in dbConnQueryPrepared calling sendQuery: " & $error)

let sendDuration = getTime().toUnixFloat() - queryStartTime
query_time_secs.set(sendDuration, [stmtName, "sendQuery"])
query_time_secs.set(sendDuration, [stmtName, "sendToDBQuery"])

queryStartTime = getTime().toUnixFloat()

Expand All @@ -286,7 +287,8 @@ proc dbConnQueryPrepared*(
debug "dbConnQueryPrepared",
requestId,
stmtName,
waitDurationSecs = waitDuration,
sendDurationSecs = sendDuration
paramValues,
waitDbQueryDurationSecs = waitDuration,
sendToDBDurationSecs = sendDuration

return ok()
8 changes: 6 additions & 2 deletions waku/waku_store/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ proc initProtocolHandler(self: WakuStore) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var successfulQuery = false ## only consider the correct queries in metrics
var resBuf: StoreResp
var queryDuration: float
self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn):
let readRes = catch:
await conn.readLp(DefaultMaxRpcSize.int)
Expand All @@ -107,7 +108,7 @@ proc initProtocolHandler(self: WakuStore) =

resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)

let queryDuration = getTime().toUnixFloat() - queryStartTime
queryDuration = getTime().toUnixFloat() - queryStartTime
waku_store_time_seconds.set(queryDuration, ["query-db-time"])
successfulQuery = true
do:
Expand All @@ -124,10 +125,13 @@ proc initProtocolHandler(self: WakuStore) =
error "Connection write error", error = writeRes.error.msg
return

debug "after sending response", requestId = resBuf.requestId
if successfulQuery:
let writeDuration = getTime().toUnixFloat() - writeRespStartTime
waku_store_time_seconds.set(writeDuration, ["send-store-resp-time"])
debug "after sending response",
requestId = resBuf.requestId,
queryDurationSecs = queryDuration,
writeStreamDurationSecs = writeDuration

waku_service_network_bytes.inc(
amount = resBuf.resp.len().int64, labelValues = [WakuStoreCodec, "out"]
Expand Down
42 changes: 28 additions & 14 deletions waku/waku_store_legacy/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ type WakuStore* = ref object of LPProtocol

## Protocol

type StoreResp = tuple[resp: seq[byte], requestId: string]

proc handleLegacyQueryRequest(
self: WakuStore, requestor: PeerId, raw_request: seq[byte]
): Future[seq[byte]] {.async.} =
): Future[StoreResp] {.async.} =
let decodeRes = HistoryRPC.decode(raw_request)
if decodeRes.isErr():
error "failed to decode rpc", peerId = requestor
error "failed to decode rpc", peerId = requestor, error = $decodeRes.error
waku_legacy_store_errors.inc(labelValues = [decodeRpcFailure])
# TODO: Return (BAD_REQUEST, cause: "decode rpc failed")
return
return (newSeq[byte](), "failed to decode rpc")

let reqRpc = decodeRes.value

if reqRpc.query.isNone():
error "empty query rpc", peerId = requestor, requestId = reqRpc.requestId
waku_legacy_store_errors.inc(labelValues = [emptyRpcQueryFailure])
# TODO: Return (BAD_REQUEST, cause: "empty query")
return
return (newSeq[byte](), "empty query rpc")

let requestId = reqRpc.requestId
var request = reqRpc.query.get().toAPI()
Expand All @@ -72,21 +72,30 @@ proc handleLegacyQueryRequest(

let error = HistoryError(kind: HistoryErrorKind.UNKNOWN).toRPC()
let response = HistoryResponseRPC(error: error)
return HistoryRPC(requestId: requestId, response: some(response)).encode().buffer
return (
HistoryRPC(requestId: requestId, response: some(response)).encode().buffer,
requestId,
)

if responseRes.isErr():
error "history query failed",
peerId = requestor, requestId = requestId, error = responseRes.error

let response = responseRes.toRPC()
return HistoryRPC(requestId: requestId, response: some(response)).encode().buffer
return (
HistoryRPC(requestId: requestId, response: some(response)).encode().buffer,
requestId,
)

let response = responseRes.toRPC()

info "sending history response",
peerId = requestor, requestId = requestId, messages = response.messages.len

return HistoryRPC(requestId: requestId, response: some(response)).encode().buffer
return (
HistoryRPC(requestId: requestId, response: some(response)).encode().buffer,
requestId,
)

proc initProtocolHandler(ws: WakuStore) =
let rejectResponseBuf = HistoryRPC(
Expand All @@ -103,7 +112,8 @@ proc initProtocolHandler(ws: WakuStore) =

proc handler(conn: Connection, proto: string) {.async, closure.} =
var successfulQuery = false ## only consider the correct queries in metrics
var resBuf: seq[byte]
var resBuf: StoreResp
var queryDuration: float
ws.requestRateLimiter.checkUsageLimit(WakuLegacyStoreCodec, conn):
let readRes = catch:
await conn.readLp(DefaultMaxRpcSize.int)
Expand All @@ -118,17 +128,17 @@ proc initProtocolHandler(ws: WakuStore) =

let queryStartTime = getTime().toUnixFloat()
resBuf = await ws.handleLegacyQueryRequest(conn.peerId, reqBuf)
let queryDuration = getTime().toUnixFloat() - queryStartTime
queryDuration = getTime().toUnixFloat() - queryStartTime
waku_legacy_store_time_seconds.set(queryDuration, ["query-db-time"])
successfulQuery = true
do:
debug "Legacy store query request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $ws.requestRateLimiter.setting
resBuf = rejectResponseBuf
resBuf = (rejectResponseBuf, "rejected")

let writeRespStartTime = getTime().toUnixFloat()
let writeRes = catch:
await conn.writeLp(resBuf)
await conn.writeLp(resBuf.resp)

if writeRes.isErr():
error "Connection write error", error = writeRes.error.msg
Expand All @@ -137,9 +147,13 @@ proc initProtocolHandler(ws: WakuStore) =
if successfulQuery:
let writeDuration = getTime().toUnixFloat() - writeRespStartTime
waku_legacy_store_time_seconds.set(writeDuration, ["send-store-resp-time"])
debug "after sending response",
requestId = resBuf.requestId,
queryDurationSecs = queryDuration,
writeStreamDurationSecs = writeDuration

waku_service_network_bytes.inc(
amount = resBuf.len().int64, labelValues = [WakuLegacyStoreCodec, "out"]
amount = resBuf.resp.len().int64, labelValues = [WakuLegacyStoreCodec, "out"]
)

ws.handler = handler
Expand Down
Loading