Skip to content

Commit

Permalink
Merge pull request #173 from cliveseldon/distributed_deploy
Browse files Browse the repository at this point in the history
Distributed deployment and Istio
  • Loading branch information
gsunner authored Jun 28, 2018
2 parents e08e135 + 63fe93f commit 93d875d
Show file tree
Hide file tree
Showing 73 changed files with 1,734 additions and 496 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,4 @@ examples/models/pyspark_pmml/derby.log
examples/models/pyspark_pmml/mnist_train.csv
examples/models/pyspark_pmml/src/main/resources/model.pmml
examples/models/pyspark_pmml/.gitignore
examples/models/h2o_mojo/src/main/resources/model.zip
2 changes: 1 addition & 1 deletion api-frontend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>io.seldon.apife</groupId>
<artifactId>seldon-apife</artifactId>
<version>0.1.9-SNAPSHOT</version>
<version>0.1.8-ISTIO-SNAPSHOT</version>
<packaging>jar</packaging>

<name>api-frontend</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import io.seldon.apife.api.oauth.InMemoryClientDetailsService;
import io.seldon.apife.deployments.DeploymentStore;
import io.seldon.apife.k8s.DeploymentWatcher;
import io.seldon.protos.DeploymentProtos.DeploymentSpec;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

Expand Down Expand Up @@ -83,7 +84,7 @@ public void configure(ClientDetailsServiceConfigurer clients) throws Exception {
String client_secret = System.getenv().get(TEST_CLIENT_SECRET);
clientDetailsService.addClient(client_key,client_secret);
SeldonDeployment dep = SeldonDeployment.newBuilder()
.setApiVersion("v1alpha1")
.setApiVersion(DeploymentWatcher.VERSION)
.setKind("SeldonDeplyment")
.setSpec(DeploymentSpec.newBuilder()
.setName("localhost")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.seldon.apife.deployments.DeploymentsHandler;
import io.seldon.apife.deployments.DeploymentsListener;
import io.seldon.apife.exception.SeldonAPIException;
import io.seldon.apife.k8s.DeploymentWatcher;
import io.seldon.protos.DeploymentProtos.DeploymentSpec;
import io.seldon.protos.DeploymentProtos.Endpoint;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;
Expand Down Expand Up @@ -179,7 +180,7 @@ private void blockUntilShutdown() throws InterruptedException {
public static void main(String[] args) throws Exception {
DeploymentStore store = new DeploymentStore(null,new InMemoryClientDetailsService());
SeldonDeployment dep = SeldonDeployment.newBuilder()
.setApiVersion("v1alpha1")
.setApiVersion(DeploymentWatcher.VERSION)
.setKind("SeldonDeplyment")
.setSpec(DeploymentSpec.newBuilder()
.setName("0.0.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public class DeploymentWatcher implements DeploymentsHandler{

protected static Logger logger = LoggerFactory.getLogger(DeploymentWatcher.class.getName());

public static final String GROUP = "machinelearning.seldon.io";
public static final String VERSION = "v1alpha2";
public static final String KIND_PLURAL = "seldondeployments";
public static final String KIND = "SeldonDeployment";

private ApiClient client;
private int resourceVersion = 0;
private int resourceVersionProcessed = 0;
Expand Down Expand Up @@ -124,7 +129,7 @@ public int watchSeldonSeldonDeployments(int resourceVersion,int resourceVersionP
CustomObjectsApi api = new CustomObjectsApi();
Watch<Object> watch = Watch.createWatch(
client,
api.listNamespacedCustomObjectCall("machinelearning.seldon.io", "v1alpha1", namespace, "seldondeployments", null, null, rs, true, null, null),
api.listNamespacedCustomObjectCall(GROUP, VERSION, namespace, KIND_PLURAL, null, null, rs, true, null, null),
new TypeToken<Watch.Response<Object>>(){}.getType());

int maxResourceVersion = resourceVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand Down Expand Up @@ -56,7 +57,7 @@ public class InternalPredictionService {
@Autowired
public InternalPredictionService(AppProperties appProperties){
this.appProperties = appProperties;
connectionManager = new PoolingHttpClientConnectionManager();
connectionManager = new PoolingHttpClientConnectionManager(10,TimeUnit.SECONDS);
connectionManager.setMaxTotal(150);
connectionManager.setDefaultMaxPerRoute(150);

Expand Down
6 changes: 1 addition & 5 deletions cluster-manager/Makefile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ VERSION_FILE=target/version.txt
build: clean build_image


build_jar: update_proto k8s_java_client
build_jar: update_proto
mvn clean package -B

write_version: build_jar
Expand Down Expand Up @@ -40,8 +40,4 @@ update_proto: download_protos
cp -vr ../proto/k8s/k8s.io src/main/proto
cp -v ../proto/k8s/v1.proto src/main/proto

k8s_java_client:
git clone -b updated_protos https://github.com/cliveseldon/java.git java_client && \
cd java_client && \
mvn install

4 changes: 4 additions & 0 deletions cluster-manager/README.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Local testing:

export SELDON_CLUSTER_MANAGER_POD_NAMESPACE=seldon
export ENGINE_CONTAINER_IMAGE_AND_VERSION=seldonio/engine:0.1.8-SNAPSHOT
4 changes: 2 additions & 2 deletions cluster-manager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>io.seldon.clustermanager</groupId>
<artifactId>seldon-cluster-manager</artifactId>
<packaging>jar</packaging>
<version>0.1.9-SNAPSHOT</version>
<version>0.1.8-ISTIO-SNAPSHOT</version>
<name>seldon-cluster-manager</name>
<url>http://maven.apache.org</url>

Expand Down Expand Up @@ -81,7 +81,7 @@
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>1.0.0-beta2-SNAPSHOT</version>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
package io.seldon.clustermanager.k8s;

import io.kubernetes.client.models.ExtensionsV1beta1DeploymentList;
import io.kubernetes.client.models.V1ServiceList;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

public interface KubeCRDHandler {

public void updateSeldonDeployment(SeldonDeployment mlDep);
public SeldonDeployment getSeldonDeployment(String name);
public ExtensionsV1beta1DeploymentList getOwnedDeployments(String seldonDeploymentName);
public V1ServiceList getOwnedServices(String seldonDeploymentName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@

import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.apis.CustomObjectsApi;
import io.kubernetes.client.apis.ExtensionsV1beta1Api;
import io.kubernetes.client.models.ExtensionsV1beta1DeploymentList;
import io.kubernetes.client.models.V1ServiceList;
import io.kubernetes.client.proto.Meta.ObjectMeta;
import io.kubernetes.client.util.Config;
import io.seldon.clustermanager.ClusterManagerProperites;
Expand All @@ -43,7 +45,7 @@ public class KubeCRDHandlerImpl implements KubeCRDHandler {
private final static Logger logger = LoggerFactory.getLogger(KubeCRDHandlerImpl.class);

public static final String GROUP = "machinelearning.seldon.io";
public static final String VERSION = "v1alpha1";
public static final String VERSION = "v1alpha2";
public static final String KIND_PLURAL = "seldondeployments";
public static final String KIND = "SeldonDeployment";

Expand Down Expand Up @@ -115,7 +117,7 @@ public ExtensionsV1beta1DeploymentList getOwnedDeployments(String seldonDeployme
{
ApiClient client = Config.defaultClient();
ExtensionsV1beta1Api api = new ExtensionsV1beta1Api(client);
ExtensionsV1beta1DeploymentList l = api.listNamespacedDeployment(namespace, null, null, null, false, Constants.LABEL_SELDON_ID+"="+seldonDeploymentName, 1, null, null, false);
ExtensionsV1beta1DeploymentList l = api.listNamespacedDeployment(namespace, null, null, null, false, Constants.LABEL_SELDON_ID+"="+seldonDeploymentName, null, null, null, false);
return l;
} catch (IOException e) {
logger.error("Failed to get deployment list for "+seldonDeploymentName,e);
Expand All @@ -125,6 +127,25 @@ public ExtensionsV1beta1DeploymentList getOwnedDeployments(String seldonDeployme
return null;
}
}

@Override
public V1ServiceList getOwnedServices(String seldonDeploymentName) {
try
{
ApiClient client = Config.defaultClient();
io.kubernetes.client.apis.CoreV1Api api = new CoreV1Api(client);
V1ServiceList l = api.listNamespacedService(namespace, null, null, null, false, Constants.LABEL_SELDON_ID+"="+seldonDeploymentName, null, null, null, null);
return l;
} catch (IOException e) {
logger.error("Failed to get deployment list for "+seldonDeploymentName,e);
return null;
} catch (ApiException e) {
logger.error("Failed to get deployment list for "+seldonDeploymentName,e);
return null;
}
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.kubernetes.client.ProtoClient.ObjectOrStatus;
import io.kubernetes.client.models.ExtensionsV1beta1Deployment;
import io.kubernetes.client.models.ExtensionsV1beta1DeploymentList;
import io.kubernetes.client.models.V1Service;
import io.kubernetes.client.models.V1ServiceList;
import io.kubernetes.client.proto.Meta.DeleteOptions;
import io.kubernetes.client.proto.V1.Service;
import io.kubernetes.client.proto.V1beta1Extensions.Deployment;
Expand Down Expand Up @@ -107,7 +109,7 @@ private void createDeployments(ProtoClient client,String namespace,List<Deployme

private Set<String> getDeploymentNames(List<Deployment> deployments)
{
Set<String> names = new HashSet<>();
Set<String> names = new HashSet<>();
for(Deployment d : deployments)
names.add(d.getMetadata().getName());
return names;
Expand Down Expand Up @@ -137,37 +139,72 @@ private void removeDeployments(ProtoClient client,String namespace,SeldonDeploym
}
}

private void createService(ProtoClient client,String namespace,Service service) throws ApiException, IOException, SeldonDeploymentException
private void removeServices(ProtoClient client,String namespace,SeldonDeployment seldonDeployment,List<Service> services) throws ApiException, IOException, SeldonDeploymentException
{
final String serviceApiPath = "/api/v1/namespaces/{namespace}/services/{name}"
.replaceAll("\\{" + "name" + "\\}", client.getApiClient().escapeString(service.getMetadata().getName()))
.replaceAll("\\{" + "namespace" + "\\}", client.getApiClient().escapeString(namespace));
ObjectOrStatus os = client.list(Service.newBuilder(),serviceApiPath);
if (os.status != null)
{
if (os.status.getCode() == 404)
{
String serviceCreateApiPath = "/api/v1/namespaces/{namespace}/services"
.replaceAll("\\{" + "namespace" + "\\}", client.getApiClient().escapeString(namespace));
os = client.create(service, serviceCreateApiPath, "v1", "Service");
if (os.status != null)
{
logger.error("Error creating service "+ProtoBufUtils.toJson(os.status));
throw new SeldonDeploymentException("Failed to create service "+service.getMetadata().getName());
Set<String> names = getServiceNames(services);
V1ServiceList svcList = crdHandler.getOwnedServices(seldonDeployment.getSpec().getName());
for(V1Service s : svcList.getItems())
{
if (!names.contains(s.getMetadata().getName()))
{
final String deleteApiPath = "/apis/v1/namespaces/{namespace}/services/{name}"
.replaceAll("\\{" + "name" + "\\}", client.getApiClient().escapeString(s.getMetadata().getName()))
.replaceAll("\\{" + "namespace" + "\\}", client.getApiClient().escapeString(namespace));
DeleteOptions options = DeleteOptions.newBuilder().setPropagationPolicy("Foreground").build();
ObjectOrStatus<Deployment> os = client.delete(Deployment.newBuilder(),deleteApiPath,options);
if (os.status != null) {
logger.error("Error deleting deployment:"+ProtoBufUtils.toJson(os.status));
throw new SeldonDeploymentException("Failed to delete service "+s.getMetadata().getName());
}
else
{
logger.debug("Created service:"+ProtoBufUtils.toJson(os.object));
}
}
else
{
logger.error("Error listing service:"+ProtoBufUtils.toJson(os.status));
throw new SeldonDeploymentException("Failed to list service "+service.getMetadata().getName());
}
}
else
logger.debug("No creating service as already exists "+service.getMetadata().getName());
else {
logger.debug("Deleted deployment:"+ProtoBufUtils.toJson(os.object));
}
}
}
}

private Set<String> getServiceNames(List<Service> services)
{
Set<String> names = new HashSet<>();
for(Service s : services)
names.add(s.getMetadata().getName());
return names;
}

private void createServices(ProtoClient client,String namespace,List<Service> services) throws ApiException, IOException, SeldonDeploymentException
{
for(Service service : services)
{
final String serviceApiPath = "/api/v1/namespaces/{namespace}/services/{name}"
.replaceAll("\\{" + "name" + "\\}", client.getApiClient().escapeString(service.getMetadata().getName()))
.replaceAll("\\{" + "namespace" + "\\}", client.getApiClient().escapeString(namespace));
ObjectOrStatus os = client.list(Service.newBuilder(),serviceApiPath);
if (os.status != null)
{
if (os.status.getCode() == 404)
{
String serviceCreateApiPath = "/api/v1/namespaces/{namespace}/services"
.replaceAll("\\{" + "namespace" + "\\}", client.getApiClient().escapeString(namespace));
os = client.create(service, serviceCreateApiPath, "v1", "Service");
if (os.status != null)
{
logger.error("Error creating service "+ProtoBufUtils.toJson(os.status));
throw new SeldonDeploymentException("Failed to create service "+service.getMetadata().getName());
}
else
{
logger.debug("Created service:"+ProtoBufUtils.toJson(os.object));
}
}
else
{
logger.error("Error listing service:"+ProtoBufUtils.toJson(os.status));
throw new SeldonDeploymentException("Failed to list service "+service.getMetadata().getName());
}
}
else
logger.debug("No creating service as already exists "+service.getMetadata().getName());
}
}

private String getNamespace(SeldonDeployment d)
Expand Down Expand Up @@ -207,7 +244,8 @@ public void createOrReplaceSeldonDeployment(SeldonDeployment mlDep) {
String namespace = getNamespace(mlDep2);
createDeployments(client, namespace, resources.deployments);
removeDeployments(client, namespace, mlDep2, resources.deployments);
createService(client, namespace, resources.service);
createServices(client, namespace, resources.services);
removeServices(client,namespace, mlDep2, resources.services);
if (!mlDep.getSpec().equals(mlDep2.getSpec()))
{
logger.debug("Pushing updated SeldonDeployment "+mlDep2.getMetadata().getName()+" back to kubectl");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
package io.seldon.clustermanager.k8s;

import io.seldon.clustermanager.k8s.SeldonDeploymentOperatorImpl.DeploymentResources;
import io.seldon.protos.DeploymentProtos.PredictorSpec;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

public interface SeldonDeploymentOperator {

public SeldonDeployment defaulting(SeldonDeployment mlDep);
public void validate(SeldonDeployment mlDep) throws SeldonDeploymentException;
public DeploymentResources createResources(SeldonDeployment mlDep) throws SeldonDeploymentException;
public String getKubernetesDeploymentName(String depName,String predictorName);
public String getSeldonServiceName(SeldonDeployment dep,PredictorSpec pred,String key);

}
Loading

0 comments on commit 93d875d

Please sign in to comment.