Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use certifi to workaround outdated CACert issue on Windows #26

42 changes: 14 additions & 28 deletions astrolabe/spec_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
import json
import logging
import os
import signal
import subprocess
import sys
from time import sleep
from urllib.parse import urlencode

from pymongo import MongoClient
from tabulate import tabulate
import junitparser
import yaml
Expand All @@ -34,7 +30,8 @@
from astrolabe.poller import BooleanCallablePoller
from astrolabe.utils import (
assert_subset, encode_cdata, get_cluster_name,
get_test_name_from_spec_file, SingleTestXUnitLogger, Timer)
get_test_name_from_spec_file, load_test_data,
DriverWorkloadSubprocessRunner, SingleTestXUnitLogger, Timer)


LOGGER = logging.getLogger(__name__)
Expand All @@ -54,11 +51,8 @@ def __init__(self, *, client, test_name, cluster_name, specification,
# Initialize attribute used for memoization of connection string.
self.__connection_string = None

# Account for platform-specific interrupt signals.
if sys.platform != 'win32':
self.sigint = signal.SIGINT
else:
self.sigint = signal.CTRL_C_EVENT
# Initialize wrapper class for running workload executor.
self.workload_runner = DriverWorkloadSubprocessRunner()

# Validate and store organization and group.
self.organization = get_one_organization_by_name(
Expand Down Expand Up @@ -157,25 +151,18 @@ def run(self, persist_cluster=False):
LOGGER.info("Loading test data on cluster {!r}".format(
self.cluster_name))
connection_string = self.get_connection_string()
client = MongoClient(connection_string, w="majority")
coll = client.get_database(
self.spec.driverWorkload.database).get_collection(
self.spec.driverWorkload.collection)
coll.drop()
coll.insert_many(test_data)
load_test_data(connection_string, self.spec.driverWorkload)
LOGGER.info("Successfully loaded test data on cluster {!r}".format(
self.cluster_name))

# Step-2: run driver workload.
LOGGER.info("Starting workload executor")
connection_string = self.get_connection_string()
driver_workload = json.dumps(self.spec.driverWorkload)
worker_subprocess = subprocess.Popen([
self.config.workload_executor, connection_string,
driver_workload], preexec_fn=os.setsid,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.workload_runner.spawn(
workload_executor=self.config.workload_executor,
connection_string=self.get_connection_string(),
driver_workload=json.dumps(self.spec.driverWorkload))
LOGGER.info("Started workload executor [PID: {}]".format(
worker_subprocess.pid))
self.workload_runner.pid))

# Step-3: begin maintenance routine.
final_config = self.spec.maintenancePlan.final
Expand Down Expand Up @@ -208,11 +195,10 @@ def run(self, persist_cluster=False):

# Step-5: interrupt driver workload and capture streams
LOGGER.info("Stopping workload executor [PID: {}]".format(
worker_subprocess.pid))
os.killpg(worker_subprocess.pid, self.sigint)
stdout, stderr = worker_subprocess.communicate(timeout=10)
self.workload_runner.pid))
stdout, stderr = self.workload_runner.terminate()
LOGGER.info("Stopped workload executor [exit code: {}]".format(
worker_subprocess.returncode))
self.workload_runner.returncode))

# Stop the timer
timer.stop()
Expand All @@ -227,7 +213,7 @@ def run(self, persist_cluster=False):
err_info = {'numErrors': -1, 'numFailures': -1}

if err_info['numErrors'] or err_info['numFailures'] \
or worker_subprocess.returncode != 0:
or self.workload_runner.returncode != 0:
LOGGER.info("FAILED: {!r}".format(self.id))
# Write xunit logs for failed tests.
errmsg = ("Number of errors: {numErrors}\n"
Expand Down
61 changes: 61 additions & 0 deletions astrolabe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@

import logging
import os
import signal
import subprocess
import sys
from functools import partial
from hashlib import sha256
from time import monotonic

import click
import junitparser

from pymongo import MongoClient


class ClickLogHandler(logging.Handler):
"""Handler for print log statements via Click's echo functionality."""
Expand Down Expand Up @@ -116,3 +122,58 @@ def get_cluster_name(test_name, name_salt):
name_hash = sha256(test_name.encode('utf-8'))
name_hash.update(name_salt.encode('utf-8'))
return name_hash.hexdigest()[:10]


def load_test_data(connection_string, driver_workload):
"""Insert the test data into the cluster."""
kwargs = {'w': "majority"}
try:
import certifi
kwargs['tlsCAFile'] = certifi.where()
except ImportError:
pass

client = MongoClient(connection_string, **kwargs)
coll = client.get_database(
driver_workload.database).get_collection(
driver_workload.collection)
coll.drop()
coll.insert(driver_workload.testData)


class DriverWorkloadSubprocessRunner:
"""Convenience wrapper to run a workload executor in a subprocess."""
def __init__(self):
self.is_windows = False
if sys.platform in ("win32", "cygwin"):
self.is_windows = True
self.workload_subprocess = None

@property
def pid(self):
return self.workload_subprocess.pid

@property
def returncode(self):
return self.workload_subprocess.returncode

def spawn(self, *, workload_executor, connection_string, driver_workload):
if not self.is_windows:
self.workload_subprocess = subprocess.Popen([
workload_executor, connection_string, driver_workload],
preexec_fn=os.setsid, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
else:
self.workload_subprocess = subprocess.Popen([
workload_executor, connection_string, driver_workload],
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return self.workload_subprocess

def terminate(self):
if not self.is_windows:
os.killpg(self.workload_subprocess.pid, signal.SIGINT)
else:
os.kill(self.workload_subprocess.pid, signal.CTRL_C_EVENT)
stdout, stderr = self.workload_subprocess.communicate(timeout=10)
return stdout, stderr
20 changes: 12 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import os
import sys

from setuptools import setup

Expand All @@ -21,6 +22,16 @@
readme_content = ''


# Dynamically generate requirements.
install_requires = [
'click>=7,<8', 'requests>=2,<3',
'pymongo>=3.10,<4', 'dnspython>=1.16,<2',
'pyyaml>=5,<6', 'tabulate>=0.8,<0.9',
'junitparser>=1,<2']
if sys.platform in ("win32", "cygwin"):
install_requires.append("certifi")


setup(
name='astrolabe',
version=version['__version__'],
Expand All @@ -34,14 +45,7 @@
license="Apache License, Version 2.0",
python_requires=">=3.5",
packages=["atlasclient", "astrolabe"],
install_requires=[
'click>=7,<8',
'requests>=2,<3',
'pymongo>=3.10,<4',
'dnspython>=1.16,<2',
'pyyaml>=5,<6',
'tabulate>=0.8,<0.9',
'junitparser>=1,<2'],
install_requires=install_requires,
entry_points={
'console_scripts': ['astrolabe=astrolabe.cli:cli']},
classifiers=[
Expand Down