Skip to content

Commit

Permalink
Proposing Lean OpenWhisk (#3886)
Browse files Browse the repository at this point in the history
Add a lean configuration option in which the controller and invoker components are merged into a single JVM process that communicate via in-memory implementation of the MessagingProvider SPI.
  • Loading branch information
Pavel Kravchenko authored and dgrove-oss committed Dec 6, 2018
1 parent 885f228 commit d4a190c
Show file tree
Hide file tree
Showing 24 changed files with 884 additions and 32 deletions.
11 changes: 11 additions & 0 deletions ansible/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,17 @@ This is usually not necessary, however in case you want to uninstall all prereqs
ansible-playbook -i environments/<environment> prereq.yml -e mode=clean
```

### Lean Setup
To have a lean setup (no Kafka, Zookeeper and no Invokers as separate entities):

At [Deploying Using CouchDB](ansible/README.md#deploying-using-cloudant) step, replace:
```
ansible-playbook -i environments/<environment> openwhisk.yml
```
by:
```
ansible-playbook -i environments/<environment> openwhisk-lean.yml
```

### Troubleshooting
Some of the more common problems and their solution are listed here.
Expand Down
39 changes: 39 additions & 0 deletions ansible/controller-lean.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Licensed to the Apache Software Foundation (ASF) under one or more contributor
# license agreements; and to You under the Apache License, Version 2.0.
---
# This playbook deploys Openwhisk Controllers.

- hosts: controllers
vars:
#
# host_group - usually "{{ groups['...'] }}" where '...' is what was used
# for 'hosts' above. The hostname of each host will be looked up in this
# group to assign a zero-based index. That index will be used in concert
# with 'name_prefix' below to assign a host/container name.
host_group: "{{ groups['controllers'] }}"
#
# name_prefix - a unique prefix for this set of controllers. The prefix
# will be used in combination with an index (determined using
# 'host_group' above) to name host/controllers.
name_prefix: "controller"
#
# controller_index_base - the deployment process allocates host docker
# ports to individual controllers based on their indices. This is an
# additional offset to prevent collisions between different controller
# groups. Usually 0 if only one group is being deployed, otherwise
# something like "{{ groups['firstcontrollergroup']|length }}"
controller_index_base: 0
#
# select which additional capabilities (from the controller role) need
# to be added to the controller. Plugin will override default
# configuration settings. (Plugins are found in the
# 'roles/controller/tasks' directory for now.)
controller_plugins:
# Join an akka cluster rather than running standalone akka
- "lean"

image_name: "lean"
lean: true

roles:
- controller
12 changes: 12 additions & 0 deletions ansible/openwhisk-lean.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Licensed to the Apache Software Foundation (ASF) under one or more contributor
# license agreements; and to You under the Apache License, Version 2.0.
---
# This playbook deploys Lean Openwhisk stack.
# It assumes you have already set up your database with the respective db provider playbook (currently cloudant.yml or couchdb.yml)
# It assumes that wipe.yml have being deployed at least once

- import_playbook: controller-lean.yml

- import_playbook: edge.yml

- import_playbook: downloadcli.yml
43 changes: 25 additions & 18 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@

- import_tasks: docker_login.yml

- name: get controller name and index
- name: get controller name, index and image name
set_fact:
controller_name: "{{ name_prefix ~ host_group.index(inventory_hostname) }}"
controller_index:
"{{ (controller_index_base|int) + host_group.index(inventory_hostname) }}"
image_name: "{{ image_name | default('controller') }}"

- name: "pull the {{ docker.image.tag }} image of controller"
shell: "docker pull {{docker_registry}}{{ docker.image.prefix }}/controller:{{docker.image.tag}}"
- name: pull the {{ image_name }}:{{ docker.image.tag }} image of controller
shell: "docker pull {{docker_registry}}{{ docker.image.prefix }}/{{image_name}}:{{docker.image.tag}}"
when: docker_registry != ""
register: result
until: (result.rc == 0)
Expand Down Expand Up @@ -276,26 +277,32 @@
controller_volumes: "{{ controller_volumes|default({}) + [coverage_logs_dir+'/controller:/coverage'] }}"
when: coverage_enabled

- name: populate controller docker container parameters
set_fact:
controller_container_params:
name: "{{ controller_name }}"
image: "{{docker_registry~docker.image.prefix}}/{{ image_name }}:{{ 'cov' if (coverage_enabled) else docker.image.tag }}"
state: started
recreate: true
restart_policy: "{{ docker.restart.policy }}"
hostname: "{{ controller_name }}"
command:
/bin/sh -c
"exec /init.sh {{ controller_index }}
>> /logs/{{ controller_name }}_logs.log 2>&1"

- name: include plugins
include_tasks: "{{ item }}.yml"
with_items: "{{ controller_plugins | default([]) }}"

- name: (re)start controller
docker_container:
name: "{{ controller_name }}"
image:
"{{docker_registry~docker.image.prefix}}/controller:{{ 'cov' if (coverage_enabled) else docker.image.tag }}"
state: started
recreate: true
restart_policy: "{{ docker.restart.policy }}"
hostname: "{{ controller_name }}"
env: "{{ env }}"
volumes: "{{ controller_volumes }}"
ports: "{{ ports_to_expose }}"
command:
/bin/sh -c
"exec /init.sh {{ controller_index }}
>> /logs/{{ controller_name }}_logs.log 2>&1"
vars:
params:
env: "{{ env }}"
volumes: "{{ controller_volumes }}"
ports: "{{ ports_to_expose }}"

docker_container: "{{ controller_container_params | combine(params) }}"

- name: wait until the Controller in this host is up and running
uri:
Expand Down
56 changes: 56 additions & 0 deletions ansible/roles/controller/tasks/lean.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one or more contributor
# license agreements; and to You under the Apache License, Version 2.0.
---
# This plugin will provide controller with Lean Controller parameters

- name: set inventory_hostname to invoker and save controllers data that can be changed by invoker task
set_fact:
controller_env: "{{ env }}"
controller_ports: "{{ ports_to_expose }}"
inventory_hostname: "invoker0"
invoker_index_base: 0
name_prefix: "invoker"
host_group: "{{ groups['invokers'] }}"

- name: include invoker data
include_tasks: "../invoker/tasks/deploy.yml"

- name: filter invoker ports
set_fact:
invoker_ports_to_expose: "{{ ports_to_expose | reject('search','8080') | reject('search','jmx_remote_port') | reject('search','jmx_remote_rmi_port') | list }}"

- name: add invoker ports to ports_to_expose
set_fact:
ports_to_expose: >-
{{ controller_ports }} +
{{ invoker_ports_to_expose }}
- name: save invoker volumes
set_fact:
invoker_volumes: "{{ volumes.split(',') | reject('search','/logs') | reject('search','/conf') | reject('search','/coverage') | list }}"

- name: populate volumes
set_fact:
controller_volumes: >-
{{ invoker_volumes }} +
{{ controller_volumes }}
- name: populate environment variables for LEAN controller
vars:
lean_env:
"CONFIG_whisk_spi_MessagingProvider": "org.apache.openwhisk.connector.lean.LeanMessagingProvider"
"CONFIG_whisk_spi_LoadBalancerProvider": "org.apache.openwhisk.core.loadBalancer.LeanBalancer"
set_fact:
env: "{{ env | combine(controller_env) | combine(lean_env) }}"

- name: update controller docker container params
vars:
lean_controller_params:
userns_mode: "host"
pid_mode: "host"
privileged: "yes"
set_fact:
controller_container_params: "{{ controller_container_params | combine(lean_controller_params) }}"

# VIM: let b:syntastic_yaml_yamllint_args="-c '".expand('%:p:h')."../../../yamllint.yml'"

2 changes: 2 additions & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@
volumes: "{{ volumes }}"
ports: "{{ ports_to_expose }}"
command: /bin/sh -c "exec /init.sh --id {{ invoker_index }} --uniqueName {{ invoker_index }} >> /logs/{{ invoker_name }}_logs.log 2>&1"
when: lean is undefined

- name: wait until Invoker is up and running
uri:
Expand All @@ -320,3 +321,4 @@
until: result.status == 200
retries: 12
delay: 5
when: lean is undefined
4 changes: 4 additions & 0 deletions ansible/setup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,7 @@
- name: generate invoker certificates
when: invoker.protocol == 'https'
local_action: shell "{{ playbook_dir }}/files/genssl.sh" "{{ invoker.ssl.cn }}" "server" "{{ playbook_dir }}/roles/invoker/files" {{ invoker.ssl.keystore.password }} {{ invoker.ssl.keyPrefix }} "generateKey"

- name: generate lean controller plugin invoker certificates
when: invoker.protocol == 'https'
local_action: shell "{{ playbook_dir }}/files/genssl.sh" "{{ invoker.ssl.cn }}" "server" "{{ playbook_dir }}/files" {{ invoker.ssl.keystore.password }} {{ invoker.ssl.keyPrefix }} "generateKey"
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.connector.lean

import scala.concurrent.duration._
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.connector.MessageConsumer
import java.util.concurrent.BlockingQueue
import java.util.concurrent.TimeUnit

class LeanConsumer(queue: BlockingQueue[Array[Byte]], override val maxPeek: Int)(implicit logging: Logging)
extends MessageConsumer {

/**
*/
override def peek(duration: FiniteDuration, retry: Int): Iterable[(String, Int, Long, Array[Byte])] = {
Option(queue.poll(duration.toMillis, TimeUnit.MILLISECONDS))
.map(record => Iterable(("", 0, 0L, record)))
.getOrElse(Iterable.empty)
}

/**
*/
override def commit(retry: Int): Unit = { /*do nothing*/ }

override def close(): Unit = {
logging.info(this, s"closing lean consumer")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.connector.lean

import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue

import scala.collection.concurrent.Map
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.FiniteDuration
import scala.util.Success
import scala.util.Try

import akka.actor.ActorSystem
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.MessageConsumer
import org.apache.openwhisk.core.connector.MessageProducer
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.core.entity.ByteSize

/**
* A simple implementation of MessagingProvider
*/
object LeanMessagingProvider extends MessagingProvider {

val queues: Map[String, BlockingQueue[Array[Byte]]] =
new TrieMap[String, BlockingQueue[Array[Byte]]]

def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
implicit logging: Logging,
actorSystem: ActorSystem): MessageConsumer = {

var queue = queues.getOrElseUpdate(topic, new LinkedBlockingQueue[Array[Byte]]())
new LeanConsumer(queue, maxPeek)
}

def getProducer(config: WhiskConfig, maxRequestSize: Option[ByteSize] = None)(
implicit logging: Logging,
actorSystem: ActorSystem): MessageProducer =
new LeanProducer(queues)

def ensureTopic(config: WhiskConfig, topic: String, topicConfigKey: String, maxMessageBytes: Option[ByteSize] = None)(
implicit logging: Logging): Try[Unit] = {
if (queues.contains(topic)) {
Success(logging.info(this, s"topic $topic already existed"))
} else {
queues.put(topic, new LinkedBlockingQueue[Array[Byte]]())
Success(logging.info(this, s"topic $topic created"))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.connector.lean

import akka.actor.ActorSystem
import scala.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.Counter
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.connector.Message
import org.apache.openwhisk.core.connector.MessageProducer

import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
import scala.collection.concurrent.Map
import java.nio.charset.StandardCharsets
import scala.concurrent.ExecutionContext

class LeanProducer(queues: Map[String, BlockingQueue[Array[Byte]]])(implicit logging: Logging, actorSystem: ActorSystem)
extends MessageProducer {

implicit val ec: ExecutionContext = actorSystem.dispatcher

override def sentCount(): Long = sentCounter.cur

/** Sends msg to topic. This is an asynchronous operation. */
override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = {
implicit val transid = msg.transid

var queue = queues.getOrElseUpdate(topic, new LinkedBlockingQueue[Array[Byte]]())

Future {
queue.put(msg.serialize.getBytes(StandardCharsets.UTF_8))
sentCounter.next()
null
}
}

/** Closes producer. */
override def close(): Unit = {
logging.info(this, "closing lean producer")
}

private val sentCounter = new Counter()
}
1 change: 1 addition & 0 deletions core/controller/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {
compile 'com.lightbend.akka.discovery:akka-discovery-kubernetes-api_2.12:0.11.0'
compile 'com.lightbend.akka.discovery:akka-discovery-marathon-api_2.12:0.11.0'
compile project(':common:scala')
compile project(':core:invoker')
scoverage gradle.scoverage.deps
}

Expand Down
Loading

0 comments on commit d4a190c

Please sign in to comment.