Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 0ede4ee
Author: Jeromy Cannon <[email protected]>
Date:   Wed Mar 20 21:54:30 2024 +0000

    timeout fetch plus log messages

    Signed-off-by: Jeromy Cannon <[email protected]>

commit abdabc2
Author: Jeromy Cannon <[email protected]>
Date:   Wed Mar 20 21:32:36 2024 +0000

    fix check failure

    Signed-off-by: Jeromy Cannon <[email protected]>

commit 6369895
Author: Jeromy Cannon <[email protected]>
Date:   Wed Mar 20 20:55:24 2024 +0000

    working version

    Signed-off-by: Jeromy Cannon <[email protected]>

Signed-off-by: Jeromy Cannon <[email protected]>
  • Loading branch information
jeromy-cannon committed Mar 21, 2024
1 parent feba5f9 commit a907f45
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 65 deletions.
101 changes: 74 additions & 27 deletions src/commands/node.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ export class NodeCommand extends BaseCommand {
this.keyManager = opts.keyManager
this.accountManager = opts.accountManager
this.keytoolDepManager = opts.keytoolDepManager
this._portForwards = []
}

/**
* stops and closes the port forwards
* @returns {Promise<void>}
*/
async close () {
if (this._portForwards) {
for (const srv of this._portForwards) {
await this.k8.stopPortForward(srv)
}
}

this._portForwards = []
}

async checkNetworkNodePod (namespace, nodeId) {
Expand Down Expand Up @@ -637,10 +652,11 @@ export class NodeCommand extends BaseCommand {
title: 'Check node proxies are ACTIVE',
task: async (ctx, parentTask) => {
const subTasks = []
let localPort = constants.LOCAL_NODE_PROXY_START_PORT
for (const nodeId of ctx.config.nodeIds) {
subTasks.push({
title: `Check proxy for node: ${chalk.yellow(nodeId)}`,
task: async () => await self.checkNetworkNodeProxyUp(ctx.config.namespace, nodeId)
task: async () => await self.checkNetworkNodeProxyUp(ctx.config.namespace, nodeId, localPort++)
})
}

Expand All @@ -664,45 +680,48 @@ export class NodeCommand extends BaseCommand {
throw new FullstackTestingError(`Error starting node: ${e.message}`, e)
} finally {
await self.accountManager.close()
await self.close()
}

return true
}

async checkNetworkNodeProxyUp (namespace, nodeId, maxAttempts = 10, delay = 5000) {
const podArray = await this.k8.getPodsByLabel([`app=haproxy-${nodeId}`, 'fullstack.hedera.com/type=haproxy'])
/**
* Check if the network node proxy is up, requires close() to be called after
* @param namespace the namespace
* @param nodeId the node id
* @param localPort the local port to forward to
* @param maxAttempts the maximum number of attempts
* @param delay the delay between attempts
* @returns {Promise<boolean>} true if the proxy is up
*/
async checkNetworkNodeProxyUp (namespace, nodeId, localPort, maxAttempts = 10, delay = 5000) {
const podArray = await this.k8.getPodsByLabel(namespace, [`app=haproxy-${nodeId}`, 'fullstack.hedera.com/type=haproxy'])

let attempts = 0
if (podArray.length > 0) {
const podName = podArray[0].metadata.name
this._portForwards.push(await this.k8.portForward(podName, localPort, 5555, namespace))
try {
await this.k8.testConnection('localhost', localPort)
} catch (e) {
throw new FullstackTestingError(`failed to create port forward for '${nodeId}' proxy on port ${localPort}`, e)
}

while (attempts < maxAttempts) {
const logResponse = await this.k8.kubeClient.readNamespacedPodLog(
podName,
namespace,
'haproxy',
undefined,
undefined,
1024,
undefined,
undefined,
undefined,
4
)

if (logResponse.response.statusCode !== 200) {
throw new FullstackTestingError(`Expected pod ${podName} log query to execute successful, but instead got a status of ${logResponse.response.statusCode}`)
}
try {
const status = await this.getNodeProxyStatus(`http://localhost:${localPort}/v2/services/haproxy/stats/native?type=backend`)
if (status === 'UP') {
this.logger.debug(`Proxy ${podName} is UP. [attempt: ${attempts}/${maxAttempts}]`)
return true
}

this.logger.debug(`Received HAProxy log from ${podName}`, { output: logResponse.body })
if (logResponse.body.includes('Server be_servers/server1 is UP')) {
this.logger.debug(`Proxy ${podName} is UP [attempt: ${attempts}/${maxAttempts}]`)
return true
attempts++
this.logger.debug(`Proxy ${podName} is not UP. Checking again in ${delay}ms ... [attempt: ${attempts}/${maxAttempts}]`)
await sleep(delay)
} catch (e) {
throw new FullstackTestingError(`failed to create port forward for '${nodeId}' proxy on port ${localPort}`, e)
}

attempts++
this.logger.debug(`Proxy ${podName} is not UP. Checking again in ${delay}ms ... [attempt: ${attempts}/${maxAttempts}]`)
await sleep(delay)
}
}

Expand Down Expand Up @@ -967,4 +986,32 @@ export class NodeCommand extends BaseCommand {
}
}
}

async getNodeProxyStatus (url) {
try {
this.logger.debug(`Fetching proxy status from: ${url}`)
const res = await fetch(url, {
method: 'GET',
signal: AbortSignal.timeout(5000),
headers: {
Authorization: `Basic ${Buffer.from(
`${constants.NODE_PROXY_USER_ID}:${constants.NODE_PROXY_PASSWORD}`).toString(
'base64')}`
}
})
const response = await res.json()

if (res.status === 200) {
const status = response[0]?.stats?.filter(
(stat) => stat.name === 'http_backend')[0]?.stats?.status
this.logger.debug(`Proxy status: ${status}`)
return status
} else {
this.logger.debug(`Proxy request status code: ${res.status}`)
return null
}
} catch (e) {
this.logger.error(`Error in fetching proxy status: ${e.message}`, e)
}
}
}
32 changes: 1 addition & 31 deletions src/core/account_manager.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import {
TransferTransaction
} from '@hashgraph/sdk'
import { FullstackTestingError, MissingArgumentError } from './errors.mjs'
import net from 'net'
import { Templates } from './templates.mjs'

const REASON_FAILED_TO_GET_KEYS = 'failed to get keys for accountId'
Expand All @@ -58,8 +57,6 @@ const REJECTED = 'rejected'
*
*/
export class AccountManager {
static _openPortForwardConnections = 0

/**
* creates a new AccountManager instance
* @param logger the logger to use
Expand Down Expand Up @@ -202,12 +199,10 @@ export class AccountManager {

if (usePortForward) {
this._portForwards.push(await this.k8.portForward(serviceObject.podName, localPort, port))
AccountManager._openPortForwardConnections++
}

nodes[`${host}:${targetPort}`] = AccountId.fromString(serviceObject.accountId)
await this.testConnection(serviceObject.podName, host, targetPort)

await this.k8.testConnection(host, targetPort)
localPort++
}

Expand Down Expand Up @@ -464,31 +459,6 @@ export class AccountManager {
return receipt.status === Status.Success
}

/**
* to test the connection to the node within the network
* @param podName the podName is only used for logging messages and errors
* @param host the host of the target connection
* @param port the port of the target connection
* @returns {Promise<boolean>}
*/
async testConnection (podName, host, port) {
const self = this

return new Promise((resolve, reject) => {
const s = new net.Socket()
s.on('error', (e) => {
s.destroy()
reject(new FullstackTestingError(`failed to connect to '${host}:${port}': ${e.message}`, e))
})

s.connect(port, host, () => {
self.logger.debug(`Connection test successful: ${host}:${port}`)
s.destroy()
resolve(true)
})
})
}

/**
* creates a new Hedera account
* @param namespace the namespace to store the Kubernetes key secret into
Expand Down
3 changes: 3 additions & 0 deletions src/core/constants.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ export const GENESIS_KEY = process.env.GENESIS_KEY || '302e020100300506032b65700
export const SYSTEM_ACCOUNTS = [[3, 100], [200, 349], [400, 750], [900, 1000]] // do account 0.0.2 last and outside the loop
export const TREASURY_ACCOUNT = 2
export const LOCAL_NODE_START_PORT = process.env.LOCAL_NODE_START_PORT || 30212
export const LOCAL_NODE_PROXY_START_PORT = process.env.LOCAL_NODE_PROXY_START_PORT || 30313
export const ACCOUNT_CREATE_BATCH_SIZE = process.env.ACCOUNT_CREATE_BATCH_SIZE || 50
export const NODE_PROXY_USER_ID = process.env.NODE_PROXY_USER_ID || 'admin'
export const NODE_PROXY_PASSWORD = process.env.NODE_PROXY_PASSWORD || 'adminpwd'

export const POD_STATUS_RUNNING = 'Running'
export const POD_STATUS_READY = 'Ready'
Expand Down
36 changes: 31 additions & 5 deletions src/core/k8.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,14 @@ export class K8 {

/**
* Get pods by labels
* @param namespace the namespace of the pods
* @param labels list of labels
* @return {Promise<Array<V1Pod>>}
*/
async getPodsByLabel (labels = []) {
const ns = this._getNamespace()
async getPodsByLabel (namespace, labels = []) {
const labelSelector = labels.join(',')
const result = await this.kubeClient.listNamespacedPod(
ns,
namespace,
undefined,
undefined,
undefined,
Expand Down Expand Up @@ -661,19 +661,45 @@ export class K8 {
* @param podName pod name
* @param localPort local port
* @param podPort port of the pod
* @param namespace namespace of the pod (optional)
*/
async portForward (podName, localPort, podPort) {
const ns = this._getNamespace()
async portForward (podName, localPort, podPort, namespace = null) {
const ns = namespace || this._getNamespace()
const forwarder = new k8s.PortForward(this.kubeConfig, false)
const server = await net.createServer((socket) => {
forwarder.portForward(ns, podName, [podPort], socket, null, socket, 3)
})

// add info for logging
server.info = `${podName}:${podPort} -> ${constants.LOCAL_HOST}:${localPort}`
this.logger.debug(`Starting port-forwarder [${server.info}]`)
return server.listen(localPort, constants.LOCAL_HOST)
}

/**
* to test the connection to a pod within the network
* @param host the host of the target connection
* @param port the port of the target connection
* @returns {Promise<boolean>}
*/
async testConnection (host, port) {
const self = this

return new Promise((resolve, reject) => {
const s = new net.Socket()
s.on('error', (e) => {
s.destroy()
reject(new FullstackTestingError(`failed to connect to '${host}:${port}': ${e.message}`, e))
})

s.connect(port, host, () => {
self.logger.debug(`Connection test successful: ${host}:${port}`)
s.destroy()
resolve(true)
})
})
}

/**
* Stop the port forwarder server
*
Expand Down
13 changes: 13 additions & 0 deletions test/e2e/commands/01_node.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,18 @@ describe.each([
expect(e).toBeNull()
}
}, 20000)

it('Node Proxy should be UP', async () => {
expect.assertions(1)

try {
await expect(nodeCmd.checkNetworkNodeProxyUp(namespace, 'node0', 30313)).resolves.toBeTruthy()
} catch (e) {
nodeCmd.logger.showUserError(e)
expect(e).toBeNull()
} finally {
await nodeCmd.close()
}
}, 20000)
})
})
4 changes: 2 additions & 2 deletions test/e2e/core/account_manager.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ describe('AccountManager', () => {

// ports should be opened
accountManager._portForwards.push(await k8.portForward(podName, localPort, podPort))
const status = await accountManager.testConnection(podName, localHost, localPort)
const status = await k8.testConnection(localHost, localPort)
expect(status).toBeTruthy()

// ports should be closed
await accountManager.close()
try {
await accountManager.testConnection(podName, localHost, localPort)
await k8.testConnection(localHost, localPort)
} catch (e) {
expect(e.message.includes(`failed to connect to '${localHost}:${localPort}'`)).toBeTruthy()
}
Expand Down

0 comments on commit a907f45

Please sign in to comment.