Skip to content

Commit

Permalink
apply black to e2elive.py as well (#1154)
Browse files Browse the repository at this point in the history
  • Loading branch information
tzaffi authored Jul 29, 2022
1 parent 1766a0d commit e6f6add
Showing 1 changed file with 97 additions and 41 deletions.
138 changes: 97 additions & 41 deletions misc/e2elive.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,33 @@

logger = logging.getLogger(__name__)


def main():
start = time.time()
import argparse

ap = argparse.ArgumentParser()
ap.add_argument('--keep-temps', default=False, action='store_true')
ap.add_argument('--indexer-bin', default=None, help='path to algorand-indexer binary, otherwise search PATH')
ap.add_argument('--indexer-port', default=None, type=int, help='port to run indexer on. defaults to random in [4000,30000]')
ap.add_argument('--connection-string', help='Use this connection string instead of attempting to manage a local database.')
ap.add_argument('--source-net', help='Path to test network directory containing Primary and other nodes. May be a tar file.')
ap.add_argument('--verbose', default=False, action='store_true')
ap.add_argument("--keep-temps", default=False, action="store_true")
ap.add_argument(
"--indexer-bin",
default=None,
help="path to algorand-indexer binary, otherwise search PATH",
)
ap.add_argument(
"--indexer-port",
default=None,
type=int,
help="port to run indexer on. defaults to random in [4000,30000]",
)
ap.add_argument(
"--connection-string",
help="Use this connection string instead of attempting to manage a local database.",
)
ap.add_argument(
"--source-net",
help="Path to test network directory containing Primary and other nodes. May be a tar file.",
)
ap.add_argument("--verbose", default=False, action="store_true")
args = ap.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
Expand All @@ -41,9 +58,9 @@ def main():
sourcenet = args.source_net
source_is_tar = False
if not sourcenet:
e2edata = os.getenv('E2EDATA')
sourcenet = e2edata and os.path.join(e2edata, 'net')
if sourcenet and hassuffix(sourcenet, '.tar', '.tar.gz', '.tar.bz2', '.tar.xz'):
e2edata = os.getenv("E2EDATA")
sourcenet = e2edata and os.path.join(e2edata, "net")
if sourcenet and hassuffix(sourcenet, ".tar", ".tar.gz", ".tar.bz2", ".tar.xz"):
source_is_tar = True
tempdir = tempfile.mkdtemp()
if not args.keep_temps:
Expand All @@ -52,67 +69,99 @@ def main():
logger.info("leaving temp dir %r", tempdir)
if not (source_is_tar or (sourcenet and os.path.isdir(sourcenet))):
# fetch test data from S3
bucket = 'algorand-testdata'
bucket = "algorand-testdata"
import boto3
from botocore.config import Config
from botocore import UNSIGNED
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
tarname = 'net_done.tar.bz2'

s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED))
tarname = "net_done.tar.bz2"
tarpath = os.path.join(tempdir, tarname)
firstFromS3Prefix(s3, bucket, 'indexer/e2e4', tarname, outpath=tarpath)
firstFromS3Prefix(s3, bucket, "indexer/e2e4", tarname, outpath=tarpath)
source_is_tar = True
sourcenet = tarpath
tempnet = os.path.join(tempdir, 'net')
tempnet = os.path.join(tempdir, "net")
if source_is_tar:
xrun(['tar', '-C', tempdir, '-x', '-f', sourcenet])
xrun(["tar", "-C", tempdir, "-x", "-f", sourcenet])
else:
xrun(['rsync', '-a', sourcenet + '/', tempnet + '/'])
blockfiles = glob.glob(os.path.join(tempdir, 'net', 'Primary', '*', '*.block.sqlite'))
xrun(["rsync", "-a", sourcenet + "/", tempnet + "/"])
blockfiles = glob.glob(
os.path.join(tempdir, "net", "Primary", "*", "*.block.sqlite")
)
lastblock = countblocks(blockfiles[0])
#subprocess.run(['find', tempnet, '-type', 'f'])
# subprocess.run(['find', tempnet, '-type', 'f'])
try:
xrun(['goal', 'network', 'start', '-r', tempnet])
xrun(["goal", "network", "start", "-r", tempnet])
except Exception:
logger.error('failed to start private network, looking for node.log')
logger.error("failed to start private network, looking for node.log")
for root, dirs, files in os.walk(tempnet):
for f in files:
if f == 'node.log':
if f == "node.log":
p = os.path.join(root, f)
logger.error('found node.log: {}'.format(p))
logger.error("found node.log: {}".format(p))
with open(p) as nf:
for line in nf:
logger.error(' {}'.format(line))
logger.error(" {}".format(line))
raise

atexitrun(['goal', 'network', 'stop', '-r', tempnet])
atexitrun(["goal", "network", "stop", "-r", tempnet])

psqlstring = ensure_test_db(args.connection_string, args.keep_temps)
algoddir = os.path.join(tempnet, 'Primary')
aiport = args.indexer_port or random.randint(4000,30000)
cmd = [indexer_bin, 'daemon', '--data-dir', tempdir, '-P', psqlstring, '--dev-mode', '--algod', algoddir, '--server', ':{}'.format(aiport)]
logger.debug("%s", ' '.join(map(repr,cmd)))
algoddir = os.path.join(tempnet, "Primary")
aiport = args.indexer_port or random.randint(4000, 30000)
cmd = [
indexer_bin,
"daemon",
"--data-dir",
tempdir,
"-P",
psqlstring,
"--dev-mode",
"--algod",
algoddir,
"--server",
":{}".format(aiport),
]
logger.debug("%s", " ".join(map(repr, cmd)))
indexerdp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
indexerout = subslurp(indexerdp.stdout)
indexerout.start()
atexit.register(indexerdp.kill)
time.sleep(0.2)

indexerurl = 'http://localhost:{}/'.format(aiport)
healthurl = indexerurl + 'health'
indexerurl = "http://localhost:{}/".format(aiport)
healthurl = indexerurl + "health"
for attempt in range(20):
(ok, json) = tryhealthurl(healthurl, args.verbose, waitforround=lastblock)
if ok:
logger.debug('health round={} OK'.format(lastblock))
logger.debug("health round={} OK".format(lastblock))
break
time.sleep(0.5)
if not ok:
logger.error('could not get indexer health, or did not reach round={}\n{}'.format(lastblock, json))
logger.error(
"could not get indexer health, or did not reach round={}\n{}".format(
lastblock, json
)
)
sys.stderr.write(indexerout.dump())
return 1
try:
logger.info('reached expected round={}'.format(lastblock))
xrun(['python3', 'misc/validate_accounting.py', '--verbose', '--algod', algoddir, '--indexer', indexerurl], timeout=20)
xrun(['go', 'run', 'cmd/e2equeries/main.go', '-pg', psqlstring, '-q'], timeout=15)
logger.info("reached expected round={}".format(lastblock))
xrun(
[
"python3",
"misc/validate_accounting.py",
"--verbose",
"--algod",
algoddir,
"--indexer",
indexerurl,
],
timeout=20,
)
xrun(
["go", "run", "cmd/e2equeries/main.go", "-pg", psqlstring, "-q"], timeout=15
)
except Exception:
sys.stderr.write(indexerout.dump())
raise
Expand All @@ -121,12 +170,14 @@ def main():

return 0


def hassuffix(x, *suffixes):
for s in suffixes:
if x.endswith(s):
return True
return False


def countblocks(path):
db = sqlite3.connect(path)
cursor = db.cursor()
Expand All @@ -136,49 +187,54 @@ def countblocks(path):
db.close()
return row[0]


def tryhealthurl(healthurl, verbose=False, waitforround=100):
try:
response = urllib.request.urlopen(healthurl)
if response.code != 200:
return (False, "")
raw = response.read()
logger.debug('health %r', raw)
logger.debug("health %r", raw)
ob = json.loads(raw)
rt = ob.get('message')
rt = ob.get("message")
if not rt:
return (False, raw)
return (int(rt) >= waitforround, raw)
except Exception as e:
if verbose:
logging.warning('GET %s %s', healthurl, e)
logging.warning("GET %s %s", healthurl, e)
return (False, "")


class subslurp:
# asynchronously accumulate stdout or stderr from a subprocess and hold it for debugging if something goes wrong
def __init__(self, f):
self.f = f
self.buf = io.BytesIO()
self.gz = gzip.open(self.buf, 'wb')
self.gz = gzip.open(self.buf, "wb")
self.l = threading.Lock()
self.t = None

def run(self):
for line in self.f:
with self.l:
if self.gz is None:
return
self.gz.write(line)

def dump(self):
with self.l:
self.gz.close()
self.gz = None
self.buf.seek(0)
r = gzip.open(self.buf, 'rt')
r = gzip.open(self.buf, "rt")
return r.read()

def start(self):
self.t = threading.Thread(target=self.run)
self.t.daemon = True
self.t.start()


if __name__ == '__main__':
if __name__ == "__main__":
sys.exit(main())

0 comments on commit e6f6add

Please sign in to comment.