diff --git a/ansible/README.md b/ansible/README.md index fb7145c8aa3..eeca95ff2cf 100644 --- a/ansible/README.md +++ b/ansible/README.md @@ -280,17 +280,6 @@ This is usually not necessary, however in case you want to uninstall all prereqs ansible-playbook -i environments/ prereq.yml -e mode=clean ``` -### Lean Setup -To have a lean setup (no Kafka, Zookeeper and no Invokers as separate entities): - -At [Deploying Using CouchDB](ansible/README.md#deploying-using-cloudant) step, replace: -``` -ansible-playbook -i environments/ openwhisk.yml -``` -by: -``` -ansible-playbook -i environments/ openwhisk-lean.yml -``` ### Troubleshooting Some of the more common problems and their solution are listed here. diff --git a/ansible/controller-lean.yml b/ansible/controller-lean.yml deleted file mode 100644 index 0aa588ddea4..00000000000 --- a/ansible/controller-lean.yml +++ /dev/null @@ -1,39 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more contributor -# license agreements; and to You under the Apache License, Version 2.0. ---- -# This playbook deploys Openwhisk Controllers. - -- hosts: controllers - vars: - # - # host_group - usually "{{ groups['...'] }}" where '...' is what was used - # for 'hosts' above. The hostname of each host will be looked up in this - # group to assign a zero-based index. That index will be used in concert - # with 'name_prefix' below to assign a host/container name. - host_group: "{{ groups['controllers'] }}" - # - # name_prefix - a unique prefix for this set of controllers. The prefix - # will be used in combination with an index (determined using - # 'host_group' above) to name host/controllers. - name_prefix: "controller" - # - # controller_index_base - the deployment process allocates host docker - # ports to individual controllers based on their indices. This is an - # additional offset to prevent collisions between different controller - # groups. Usually 0 if only one group is being deployed, otherwise - # something like "{{ groups['firstcontrollergroup']|length }}" - controller_index_base: 0 - # - # select which additional capabilities (from the controller role) need - # to be added to the controller. Plugin will override default - # configuration settings. (Plugins are found in the - # 'roles/controller/tasks' directory for now.) - controller_plugins: - # Join an akka cluster rather than running standalone akka - - "lean" - - image_name: "lean" - lean: true - - roles: - - controller diff --git a/ansible/openwhisk-lean.yml b/ansible/openwhisk-lean.yml deleted file mode 100644 index 9dd87cf25c2..00000000000 --- a/ansible/openwhisk-lean.yml +++ /dev/null @@ -1,12 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more contributor -# license agreements; and to You under the Apache License, Version 2.0. ---- -# This playbook deploys Lean Openwhisk stack. -# It assumes you have already set up your database with the respective db provider playbook (currently cloudant.yml or couchdb.yml) -# It assumes that wipe.yml have being deployed at least once - -- import_playbook: controller-lean.yml - -- import_playbook: edge.yml - -- import_playbook: downloadcli.yml diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 4ec8a113734..0fb2ebbdf13 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -6,15 +6,14 @@ - import_tasks: docker_login.yml -- name: get controller name, index and image name +- name: get controller name and index set_fact: controller_name: "{{ name_prefix ~ host_group.index(inventory_hostname) }}" controller_index: "{{ (controller_index_base|int) + host_group.index(inventory_hostname) }}" - image_name: "{{ image_name | default('controller') }}" -- name: pull the {{ image_name }}:{{ docker.image.tag }} image of controller - shell: "docker pull {{docker_registry}}{{ docker.image.prefix }}/{{image_name}}:{{docker.image.tag}}" +- name: "pull the {{ docker.image.tag }} image of controller" + shell: "docker pull {{docker_registry}}{{ docker.image.prefix }}/controller:{{docker.image.tag}}" when: docker_registry != "" register: result until: (result.rc == 0) @@ -277,32 +276,26 @@ controller_volumes: "{{ controller_volumes|default({}) + [coverage_logs_dir+'/controller:/coverage'] }}" when: coverage_enabled -- name: populate controller docker container parameters - set_fact: - controller_container_params: - name: "{{ controller_name }}" - image: "{{docker_registry~docker.image.prefix}}/{{ image_name }}:{{ 'cov' if (coverage_enabled) else docker.image.tag }}" - state: started - recreate: true - restart_policy: "{{ docker.restart.policy }}" - hostname: "{{ controller_name }}" - command: - /bin/sh -c - "exec /init.sh {{ controller_index }} - >> /logs/{{ controller_name }}_logs.log 2>&1" - - name: include plugins include_tasks: "{{ item }}.yml" with_items: "{{ controller_plugins | default([]) }}" - name: (re)start controller - vars: - params: - env: "{{ env }}" - volumes: "{{ controller_volumes }}" - ports: "{{ ports_to_expose }}" - - docker_container: "{{ controller_container_params | combine(params) }}" + docker_container: + name: "{{ controller_name }}" + image: + "{{docker_registry~docker.image.prefix}}/controller:{{ 'cov' if (coverage_enabled) else docker.image.tag }}" + state: started + recreate: true + restart_policy: "{{ docker.restart.policy }}" + hostname: "{{ controller_name }}" + env: "{{ env }}" + volumes: "{{ controller_volumes }}" + ports: "{{ ports_to_expose }}" + command: + /bin/sh -c + "exec /init.sh {{ controller_index }} + >> /logs/{{ controller_name }}_logs.log 2>&1" - name: wait until the Controller in this host is up and running uri: diff --git a/ansible/roles/controller/tasks/lean.yml b/ansible/roles/controller/tasks/lean.yml deleted file mode 100644 index 6dc479908b0..00000000000 --- a/ansible/roles/controller/tasks/lean.yml +++ /dev/null @@ -1,56 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more contributor -# license agreements; and to You under the Apache License, Version 2.0. ---- -# This plugin will provide controller with Lean Controller parameters - -- name: set inventory_hostname to invoker and save controllers data that can be changed by invoker task - set_fact: - controller_env: "{{ env }}" - controller_ports: "{{ ports_to_expose }}" - inventory_hostname: "invoker0" - invoker_index_base: 0 - name_prefix: "invoker" - host_group: "{{ groups['invokers'] }}" - -- name: include invoker data - include_tasks: "../invoker/tasks/deploy.yml" - -- name: filter invoker ports - set_fact: - invoker_ports_to_expose: "{{ ports_to_expose | reject('search','8080') | reject('search','jmx_remote_port') | reject('search','jmx_remote_rmi_port') | list }}" - -- name: add invoker ports to ports_to_expose - set_fact: - ports_to_expose: >- - {{ controller_ports }} + - {{ invoker_ports_to_expose }} - -- name: save invoker volumes - set_fact: - invoker_volumes: "{{ volumes.split(',') | reject('search','/logs') | reject('search','/conf') | reject('search','/coverage') | list }}" - -- name: populate volumes - set_fact: - controller_volumes: >- - {{ invoker_volumes }} + - {{ controller_volumes }} - -- name: populate environment variables for LEAN controller - vars: - lean_env: - "CONFIG_whisk_spi_MessagingProvider": "org.apache.openwhisk.connector.lean.LeanMessagingProvider" - "CONFIG_whisk_spi_LoadBalancerProvider": "org.apache.openwhisk.core.loadBalancer.LeanBalancer" - set_fact: - env: "{{ env | combine(controller_env) | combine(lean_env) }}" - -- name: update controller docker container params - vars: - lean_controller_params: - userns_mode: "host" - pid_mode: "host" - privileged: "yes" - set_fact: - controller_container_params: "{{ controller_container_params | combine(lean_controller_params) }}" - -# VIM: let b:syntastic_yaml_yamllint_args="-c '".expand('%:p:h')."../../../yamllint.yml'" - diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index f5a9acb2049..32acfc79b93 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -309,7 +309,6 @@ volumes: "{{ volumes }}" ports: "{{ ports_to_expose }}" command: /bin/sh -c "exec /init.sh --id {{ invoker_index }} --uniqueName {{ invoker_index }} >> /logs/{{ invoker_name }}_logs.log 2>&1" - when: lean is undefined - name: wait until Invoker is up and running uri: @@ -321,4 +320,3 @@ until: result.status == 200 retries: 12 delay: 5 - when: lean is undefined diff --git a/ansible/setup.yml b/ansible/setup.yml index f4ea323b3a8..a550389939a 100644 --- a/ansible/setup.yml +++ b/ansible/setup.yml @@ -92,7 +92,3 @@ - name: generate invoker certificates when: invoker.protocol == 'https' local_action: shell "{{ playbook_dir }}/files/genssl.sh" "{{ invoker.ssl.cn }}" "server" "{{ playbook_dir }}/roles/invoker/files" {{ invoker.ssl.keystore.password }} {{ invoker.ssl.keyPrefix }} "generateKey" - - - name: generate lean controller plugin invoker certificates - when: invoker.protocol == 'https' - local_action: shell "{{ playbook_dir }}/files/genssl.sh" "{{ invoker.ssl.cn }}" "server" "{{ playbook_dir }}/files" {{ invoker.ssl.keystore.password }} {{ invoker.ssl.keyPrefix }} "generateKey" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanConsumer.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanConsumer.scala deleted file mode 100644 index eec420ff07c..00000000000 --- a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanConsumer.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.openwhisk.connector.lean - -import scala.concurrent.duration._ -import org.apache.openwhisk.common.Logging -import org.apache.openwhisk.core.connector.MessageConsumer -import java.util.concurrent.BlockingQueue -import java.util.concurrent.TimeUnit - -class LeanConsumer(queue: BlockingQueue[Array[Byte]], override val maxPeek: Int)(implicit logging: Logging) - extends MessageConsumer { - - /** - */ - override def peek(duration: FiniteDuration, retry: Int): Iterable[(String, Int, Long, Array[Byte])] = { - Option(queue.poll(duration.toMillis, TimeUnit.MILLISECONDS)) - .map(record => Iterable(("", 0, 0L, record))) - .getOrElse(Iterable.empty) - } - - /** - */ - override def commit(retry: Int): Unit = { /*do nothing*/ } - - override def close(): Unit = { - logging.info(this, s"closing lean consumer") - } -} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanMessagingProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanMessagingProvider.scala deleted file mode 100644 index f13f5d7a929..00000000000 --- a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanMessagingProvider.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.openwhisk.connector.lean - -import java.util.concurrent.BlockingQueue -import java.util.concurrent.LinkedBlockingQueue - -import scala.collection.concurrent.Map -import scala.collection.concurrent.TrieMap -import scala.concurrent.duration.FiniteDuration -import scala.util.Success -import scala.util.Try - -import akka.actor.ActorSystem -import org.apache.openwhisk.common.Logging -import org.apache.openwhisk.core.WhiskConfig -import org.apache.openwhisk.core.connector.MessageConsumer -import org.apache.openwhisk.core.connector.MessageProducer -import org.apache.openwhisk.core.connector.MessagingProvider -import org.apache.openwhisk.core.entity.ByteSize - -/** - * A simple implementation of MessagingProvider - */ -object LeanMessagingProvider extends MessagingProvider { - - val queues: Map[String, BlockingQueue[Array[Byte]]] = - new TrieMap[String, BlockingQueue[Array[Byte]]] - - def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)( - implicit logging: Logging, - actorSystem: ActorSystem): MessageConsumer = { - - var queue = queues.getOrElseUpdate(topic, new LinkedBlockingQueue[Array[Byte]]()) - new LeanConsumer(queue, maxPeek) - } - - def getProducer(config: WhiskConfig, maxRequestSize: Option[ByteSize] = None)( - implicit logging: Logging, - actorSystem: ActorSystem): MessageProducer = - new LeanProducer(queues) - - def ensureTopic(config: WhiskConfig, topic: String, topicConfigKey: String, maxMessageBytes: Option[ByteSize] = None)( - implicit logging: Logging): Try[Unit] = { - if (queues.contains(topic)) { - Success(logging.info(this, s"topic $topic already existed")) - } else { - queues.put(topic, new LinkedBlockingQueue[Array[Byte]]()) - Success(logging.info(this, s"topic $topic created")) - } - } -} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala deleted file mode 100644 index 1d35119c2f8..00000000000 --- a/common/scala/src/main/scala/org/apache/openwhisk/connector/lean/LeanProducer.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.openwhisk.connector.lean - -import akka.actor.ActorSystem -import scala.concurrent.Future -import org.apache.kafka.clients.producer.RecordMetadata -import org.apache.openwhisk.common.Counter -import org.apache.openwhisk.common.Logging -import org.apache.openwhisk.core.connector.Message -import org.apache.openwhisk.core.connector.MessageProducer - -import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue} -import scala.collection.concurrent.Map -import java.nio.charset.StandardCharsets -import scala.concurrent.ExecutionContext - -class LeanProducer(queues: Map[String, BlockingQueue[Array[Byte]]])(implicit logging: Logging, actorSystem: ActorSystem) - extends MessageProducer { - - implicit val ec: ExecutionContext = actorSystem.dispatcher - - override def sentCount(): Long = sentCounter.cur - - /** Sends msg to topic. This is an asynchronous operation. */ - override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = { - implicit val transid = msg.transid - - var queue = queues.getOrElseUpdate(topic, new LinkedBlockingQueue[Array[Byte]]()) - - Future { - queue.put(msg.serialize.getBytes(StandardCharsets.UTF_8)) - sentCounter.next() - null - } - } - - /** Closes producer. */ - override def close(): Unit = { - logging.info(this, "closing lean producer") - } - - private val sentCounter = new Counter() -} diff --git a/core/controller/build.gradle b/core/controller/build.gradle index 00591570a1d..243c77609ac 100644 --- a/core/controller/build.gradle +++ b/core/controller/build.gradle @@ -43,7 +43,6 @@ dependencies { compile 'com.lightbend.akka.discovery:akka-discovery-kubernetes-api_2.12:0.11.0' compile 'com.lightbend.akka.discovery:akka-discovery-marathon-api_2.12:0.11.0' compile project(':common:scala') - compile project(':core:invoker') scoverage gradle.scoverage.deps } diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala deleted file mode 100644 index 403cca30b63..00000000000 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LeanBalancer.scala +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.openwhisk.core.loadBalancer - -import java.nio.charset.StandardCharsets -import java.util.concurrent.atomic.LongAdder - -import akka.actor.{ActorSystem, Props} -import akka.event.Logging.InfoLevel -import akka.stream.ActorMaterializer -import org.apache.kafka.clients.producer.RecordMetadata -import pureconfig._ -import org.apache.openwhisk.spi.SpiLoader -import org.apache.openwhisk.core.entity._ -import org.apache.openwhisk.core.entity.size._ -import org.apache.openwhisk.common.LoggingMarkers._ -import org.apache.openwhisk.common._ -import org.apache.openwhisk.core.WhiskConfig._ -import org.apache.openwhisk.core.connector._ -import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} -import scala.collection.concurrent.TrieMap -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.{Failure, Success} -import org.apache.openwhisk.core.invoker.InvokerReactive -import org.apache.openwhisk.utils.ExecutionContextFactory -import org.apache.openwhisk.core.containerpool.ContainerPoolConfig - -/** - * Lean loadbalancer implemetation. - * - * Communicates with Invoker directly without Kafka in the middle. Invoker does not exist as a separate entity, it is built together with Controller - * Uses LeanMessagingProvider to use in-memory queue instead of Kafka - */ -class LeanBalancer(config: WhiskConfig, controllerInstance: ControllerInstanceId)(implicit val actorSystem: ActorSystem, - logging: Logging, - materializer: ActorMaterializer) - extends LoadBalancer { - - private implicit val executionContext: ExecutionContext = actorSystem.dispatcher - - private val lbConfig = loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer) - - /** State related to invocations and throttling */ - private val activations = TrieMap[ActivationId, ActivationEntry]() - private val activationsPerNamespace = TrieMap[UUID, LongAdder]() - private val totalActivations = new LongAdder() - private val totalActivationMemory = new LongAdder() - - actorSystem.scheduler.schedule(0.seconds, 10.seconds) { - MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance), totalActivations.longValue) - MetricEmitter.emitHistogramMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance), totalActivationMemory.longValue) - } - - /** Loadbalancer interface methods */ - override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(IndexedSeq.empty[InvokerHealth]) - override def activeActivationsFor(namespace: UUID): Future[Int] = - Future.successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0)) - override def totalActiveActivations: Future[Int] = Future.successful(totalActivations.intValue()) - override def clusterSize: Int = 1 - - val poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool) - val invokerName = InvokerInstanceId(0, None, None, poolConfig.userMemory) - val controllerName = ControllerInstanceId("controller-lean") - - /** 1. Publish a message to the loadbalancer */ - override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( - implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = { - val entry = setupActivation(msg, action, invokerName) - sendActivationToInvoker(messageProducer, msg, invokerName).map { _ => - entry.promise.future - } - } - - /** 2. Update local state with the to be executed activation */ - private def setupActivation(msg: ActivationMessage, - action: ExecutableWhiskActionMetaData, - instance: InvokerInstanceId): ActivationEntry = { - - totalActivations.increment() - totalActivationMemory.add(action.limits.memory.megabytes) - activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new LongAdder()).increment() - val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION) * lbConfig.timeoutFactor) + 1.minute - // Install a timeout handler for the catastrophic case where an active ack is not received at all - // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when - // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized); - // in this case, if the activation handler is still registered, remove it and update the books. - activations.getOrElseUpdate( - msg.activationId, { - val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) { - processCompletion(msg.activationId, msg.transid, forced = true, isSystemError = false, invoker = instance) - } - - // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success - ActivationEntry( - msg.activationId, - msg.user.namespace.uuid, - instance, - action.limits.memory.megabytes.MB, - action.limits.concurrency.maxConcurrent, - action.fullyQualifiedName(true), - timeoutHandler, - Promise[Either[ActivationId, WhiskActivation]]()) - }) - } - - private val messagingProvider = SpiLoader.get[MessagingProvider] - private val messageProducer = messagingProvider.getProducer(config) - - /** 3. Send the activation to the invoker */ - private def sendActivationToInvoker(producer: MessageProducer, - msg: ActivationMessage, - invoker: InvokerInstanceId): Future[RecordMetadata] = { - implicit val transid: TransactionId = msg.transid - - val topic = s"invoker${invoker.toInt}" - - MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START) - val start = transid.started( - this, - LoggingMarkers.CONTROLLER_KAFKA, - s"posting topic '$topic' with activation id '${msg.activationId}'", - logLevel = InfoLevel) - - producer.send(topic, msg).andThen { - case Success(status) => - transid.finished(this, start, s"posted to $topic", logLevel = InfoLevel) - case Failure(_) => - transid.failed(this, start, s"error on posting to topic $topic") - } - } - - /** - * Subscribes to active acks (completion messages from the invokers), and - * registers a handler for received active acks from invokers. - */ - private val activeAckTopic = s"completed${controllerInstance.asString}" - private val maxActiveAcksPerPoll = 128 - private val activeAckPollDuration = 1.second - private val activeAckConsumer = - messagingProvider.getConsumer(config, activeAckTopic, activeAckTopic, maxPeek = maxActiveAcksPerPoll) - - private val activationFeed = actorSystem.actorOf(Props { - new MessageFeed( - "activeack", - logging, - activeAckConsumer, - maxActiveAcksPerPoll, - activeAckPollDuration, - processAcknowledgement) - }) - - /** 4. Get the acknowledgement message and parse it */ - private def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = Future { - val raw = new String(bytes, StandardCharsets.UTF_8) - AcknowledegmentMessage.parse(raw) match { - case Success(m: CompletionMessage) => - processCompletion( - m.activationId, - m.transid, - forced = false, - isSystemError = m.isSystemError, - invoker = m.invoker) - activationFeed ! MessageFeed.Processed - - case Success(m: ResultMessage) => - processResult(m.response, m.transid) - activationFeed ! MessageFeed.Processed - - case Failure(t) => - activationFeed ! MessageFeed.Processed - logging.error(this, s"failed processing message: $raw") - - case _ => - activationFeed ! MessageFeed.Processed - logging.error(this, s"Unexpected Acknowledgment message received by loadbalancer: $raw") - } - } - - /** 5. Process the result ack and return it to the user */ - private def processResult(response: Either[ActivationId, WhiskActivation], tid: TransactionId): Unit = { - val aid = response.fold(l => l, r => r.activationId) - - // Resolve the promise to send the result back to the user - // The activation will be removed from `activations`-map later, when we receive the completion message, because the - // slot of the invoker is not yet free for new activations. - activations.get(aid).map { entry => - entry.promise.trySuccess(response) - } - logging.info(this, s"received result ack for '$aid'")(tid) - } - - /** Process the completion ack and update the state */ - private def processCompletion(aid: ActivationId, - tid: TransactionId, - forced: Boolean, - isSystemError: Boolean, - invoker: InvokerInstanceId): Unit = { - - val invocationResult = if (forced) { - InvocationFinishedResult.Timeout - } else { - // If the response contains a system error, report that, otherwise report Success - // Left generally is considered a Success, since that could be a message not fitting into Kafka - if (isSystemError) { - InvocationFinishedResult.SystemError - } else { - InvocationFinishedResult.Success - } - } - - activations.remove(aid) match { - case Some(entry) => - totalActivations.decrement() - totalActivationMemory.add(entry.memory.toMB * (-1)) - activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement()) - - if (!forced) { - entry.timeoutHandler.cancel() - entry.promise.trySuccess(Left(aid)) - } else { - entry.promise.tryFailure(new Throwable("no active ack received")) - } - - logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid) - // Active acks that are received here are strictly from user actions - health actions are not part of - // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion. - case None if !forced => - // the entry has already been removed but we receive an active ack for this activation Id. - // This happens for health actions, because they don't have an entry in Loadbalancerdata or - // for activations that already timed out. - logging.info(this, s"received active ack for '$aid' which has no entry")(tid) - case None => - // the entry has already been removed by an active ack. This part of the code is reached by the timeout. - // As the active ack is already processed we don't have to do anything here. - logging.info(this, s"forced active ack for '$aid' which has no entry")(tid) - } - } - - private def getInvoker() { - implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext() - val actorSystema: ActorSystem = - ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec)) - val invoker = new InvokerReactive(config, invokerName, messageProducer)(actorSystema, implicitly) - } - - getInvoker() -} - -object LeanBalancer extends LoadBalancerProvider { - - override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)( - implicit actorSystem: ActorSystem, - logging: Logging, - materializer: ActorMaterializer): LoadBalancer = new LeanBalancer(whiskConfig, instance) - - def requiredProperties = - Map(servicePort -> 8080.toString(), runtimesRegistry -> "") ++ - ExecManifest.requiredProperties ++ - wskApiHost -} diff --git a/core/lean/Dockerfile b/core/lean/Dockerfile deleted file mode 100644 index e846f0106e0..00000000000 --- a/core/lean/Dockerfile +++ /dev/null @@ -1,55 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more contributor -# license agreements. See the NOTICE file distributed with this work for additional -# information regarding copyright ownership. The ASF licenses this file to you -# under the Apache License, Version 2.0 (the # "License"); you may not use this -# file except in compliance with the License. You may obtain a copy of the License -# at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. -# - -FROM scala - -ENV UID=1001 \ - NOT_ROOT_USER=owuser -ENV SWAGGER_UI_DOWNLOAD_SHA256=3d7ef5ddc59e10f132fe99771498f0f1ba7a2cbfb9585f9863d4191a574c96e7 \ - SWAGGER_UI_VERSION=3.6.0 - -ENV DOCKER_VERSION=1.12.0 \ - DOCKER_DOWNLOAD_SHA256=3dd07f65ea4a7b4c8829f311ab0213bca9ac551b5b24706f3e79a97e22097f8b - -RUN apk add --update openssl - -# Uncomment to fetch latest version of docker instead: RUN wget -qO- https://get.docker.com | sh -# Install docker client -RUN curl -sSL -o docker-${DOCKER_VERSION}.tgz https://get.docker.com/builds/Linux/x86_64/docker-${DOCKER_VERSION}.tgz && \ -echo "${DOCKER_DOWNLOAD_SHA256} docker-${DOCKER_VERSION}.tgz" | sha256sum -c - && \ -tar --strip-components 1 -xvzf docker-${DOCKER_VERSION}.tgz -C /usr/bin docker/docker && \ -tar --strip-components 1 -xvzf docker-${DOCKER_VERSION}.tgz -C /usr/bin docker/docker-runc && \ -rm -f docker-${DOCKER_VERSION}.tgz && \ -chmod +x /usr/bin/docker && \ -chmod +x /usr/bin/docker-runc - -# Install swagger-ui -RUN curl -sSL -o swagger-ui-v${SWAGGER_UI_VERSION}.tar.gz --no-verbose https://github.com/swagger-api/swagger-ui/archive/v${SWAGGER_UI_VERSION}.tar.gz && \ - echo "${SWAGGER_UI_DOWNLOAD_SHA256} swagger-ui-v${SWAGGER_UI_VERSION}.tar.gz" | sha256sum -c - && \ - mkdir swagger-ui && \ - tar zxf swagger-ui-v${SWAGGER_UI_VERSION}.tar.gz -C /swagger-ui --strip-components=2 swagger-ui-${SWAGGER_UI_VERSION}/dist && \ - rm swagger-ui-v${SWAGGER_UI_VERSION}.tar.gz && \ - sed -i s#http://petstore.swagger.io/v2/swagger.json#/api/v1/api-docs#g /swagger-ui/index.html - -# Copy app jars -ADD build/distributions/lean.tar / - -COPY init.sh / -RUN chmod +x init.sh -RUN adduser -D -u ${UID} -h /home/${NOT_ROOT_USER} -s /bin/bash ${NOT_ROOT_USER} - -EXPOSE 8080 -CMD ["./init.sh", "0"] diff --git a/core/lean/Dockerfile.cov b/core/lean/Dockerfile.cov deleted file mode 100644 index e8d00afbcdf..00000000000 --- a/core/lean/Dockerfile.cov +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more contributor -# license agreements; and to You under the Apache License, Version 2.0. - -FROM lean - -ARG OW_ROOT_DIR - -USER root -RUN mkdir -p /coverage/common && \ - mkdir -p /coverage/controller && \ - mkdir -p "${OW_ROOT_DIR}/common/scala/build" && \ - mkdir -p "${OW_ROOT_DIR}/core/controller/build" && \ - ln -s /coverage/common "${OW_ROOT_DIR}/common/scala/build/scoverage" && \ - ln -s /coverage/controller "${OW_ROOT_DIR}/core/controller/build/scoverage" - -COPY build/tmp/docker-coverage /controller/ diff --git a/core/lean/build.gradle b/core/lean/build.gradle deleted file mode 100644 index 0d67778d4c9..00000000000 --- a/core/lean/build.gradle +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -apply plugin: 'scala' -apply plugin: 'application' -apply plugin: 'eclipse' -apply plugin: 'maven' -apply plugin: 'org.scoverage' - -ext.dockerImageName = 'lean' -apply from: '../../gradle/docker.gradle' -distDocker.dependsOn ':common:scala:distDocker', 'distTar' - -project.archivesBaseName = "openwhisk-lean" - -ext.coverageJars = [ - "${buildDir}/libs/${project.archivesBaseName}-$version-scoverage.jar", - "${project(':common:scala').buildDir.absolutePath}/libs/openwhisk-common-$version-scoverage.jar" -] -distDockerCoverage.dependsOn ':common:scala:jarScoverage', 'jarScoverage' - -repositories { - mavenCentral() -} - -dependencies { - compile "org.scala-lang:scala-library:${gradle.scala.version}" - compile 'com.lightbend.akka.management:akka-management-cluster-bootstrap_2.12:0.11.0' - compile 'com.lightbend.akka.discovery:akka-discovery-kubernetes-api_2.12:0.11.0' - compile 'com.lightbend.akka.discovery:akka-discovery-marathon-api_2.12:0.11.0' - compile project(':common:scala') - compile project(':core:controller') - scoverage gradle.scoverage.deps -} - -tasks.withType(ScalaCompile) { - scalaCompileOptions.additionalParameters = gradle.scala.compileFlags -} - -mainClassName = "org.apache.openwhisk.core.controller.Controller" -applicationDefaultJvmArgs = ["-Djava.security.egd=file:/dev/./urandom"] diff --git a/core/lean/init.sh b/core/lean/init.sh deleted file mode 100644 index 025318dcbd5..00000000000 --- a/core/lean/init.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/bash - -# Licensed to the Apache Software Foundation (ASF) under one or more contributor -# license agreements; and to You under the Apache License, Version 2.0. - -./copyJMXFiles.sh - -export LEAN_OPTS="$CONTROLLER_OPTS $(./transformEnvironment.sh)" - -exec lean/bin/lean "$@" diff --git a/settings.gradle b/settings.gradle index 3898b1977b4..39ab2700ecb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -19,7 +19,6 @@ include 'common:scala' include 'core:controller' include 'core:invoker' -include 'core:lean' include 'tests' include 'tests:performance:gatling_tests' diff --git a/tests/build.gradle b/tests/build.gradle index 11557078230..e427d723ba6 100644 --- a/tests/build.gradle +++ b/tests/build.gradle @@ -51,20 +51,6 @@ def leanExcludes = [ '**/MaxActionDurationTests*', ] -def systemIncludes = [ - "org/apache/openwhisk/core/limits/**", - "org/apache/openwhisk/core/admin/**", - "org/apache/openwhisk/core/cli/test/**", - "org/apache/openwhisk/core/apigw/actions/test/**", - "org/apache/openwhisk/core/database/test/*CacheConcurrencyTests*", - "org/apache/openwhisk/core/controller/test/*ControllerApiTests*", - "apigw/healthtests/**", - "ha/**", - "services/**", - "system/basic/**", - "system/rest/**", -] - ext.testSets = [ "REQUIRE_ONLY_DB" : [ "includes" : [ @@ -81,7 +67,19 @@ ext.testSets = [ ] ], "REQUIRE_SYSTEM" : [ - "includes" : systemIncludes, + "includes" : [ + "org/apache/openwhisk/core/limits/**", + "org/apache/openwhisk/core/admin/**", + "org/apache/openwhisk/core/cli/test/**", + "org/apache/openwhisk/core/apigw/actions/test/**", + "org/apache/openwhisk/core/database/test/*CacheConcurrencyTests*", + "org/apache/openwhisk/core/controller/test/*ControllerApiTests*", + "apigw/healthtests/**", + "ha/**", + "services/**", + "system/basic/**", + "system/rest/**", + ], "excludes": [ "system/basic/WskMultiRuntimeTests*" ] @@ -100,16 +98,6 @@ ext.testSets = [ "includes" : [ "system/basic/**" ] - ], - "REQUIRE_LEAN_SYSTEM" : [ - "includes" : systemIncludes, - - // Lean OpenWhisk doesn't use Kafka and has no Kafka installed as part of the setup. - // Tests suit below validating KafkaConnector so have to be excluded for Lean System tests - "excludes" : [ - "**/*KafkaConnectorTests*", - "system/basic/WskMultiRuntimeTests*" - ] ] ] diff --git a/tests/performance/preparation/deploy-lean.sh b/tests/performance/preparation/deploy-lean.sh deleted file mode 100755 index 0d5ce868f89..00000000000 --- a/tests/performance/preparation/deploy-lean.sh +++ /dev/null @@ -1,38 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -set -e -SCRIPTDIR="$(cd "$(dirname "$0")"; pwd)" -ROOTDIR="$SCRIPTDIR/../../.." - -# Build Openwhisk -cd $ROOTDIR -TERM=dumb ./gradlew distDocker -PdockerImagePrefix=testing $GRADLE_PROJS_SKIP - -# Deploy Openwhisk -cd $ROOTDIR/ansible -ANSIBLE_CMD="$ANSIBLE_CMD -e limit_invocations_per_minute=999999 -e limit_invocations_concurrent=999999 -e controller_client_auth=false -e userLogs_spi=\"org.apache.openwhisk.core.containerpool.logging.LogDriverLogStoreProvider\"" - -$ANSIBLE_CMD setup.yml - -$ANSIBLE_CMD prereq.yml -$ANSIBLE_CMD couchdb.yml -$ANSIBLE_CMD initdb.yml -$ANSIBLE_CMD wipe.yml - -$ANSIBLE_CMD controller-lean.yml -$ANSIBLE_CMD edge.yml diff --git a/tests/src/test/scala/org/apache/openwhisk/core/cli/test/BaseApiGwTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/cli/test/BaseApiGwTests.scala index 6bad4bb35db..1a74f01fa4e 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/cli/test/BaseApiGwTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/cli/test/BaseApiGwTests.scala @@ -93,9 +93,6 @@ abstract class BaseApiGwTests extends TestHelpers with WskTestHelpers with Befor val wskprops = WskProps(token = "SOME TOKEN") wskprops.writeFile(cliWskPropsFile) println(s"wsk temporary props file created here: ${cliWskPropsFile.getCanonicalPath()}") - val halfThrottleTime = 60.seconds.toMillis / 2 - println(s"Waiting ${halfThrottleTime} milliseconds to settle the throttle from previous tests") - Thread.sleep(halfThrottleTime) } /* diff --git a/tools/travis/distDocker-lean.sh b/tools/travis/distDocker-lean.sh deleted file mode 100755 index f5bf4c693b7..00000000000 --- a/tools/travis/distDocker-lean.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -set -e - -# Build script for Travis-CI. - -SECONDS=0 -SCRIPTDIR=$(cd $(dirname "$0") && pwd) -ROOTDIR="$SCRIPTDIR/../.." - -cd $ROOTDIR -TERM=dumb ./gradlew distDocker -PdockerImagePrefix=testing $GRADLE_PROJS_SKIP - -TERM=dumb ./gradlew :core:lean:distDockerCoverage -PdockerImagePrefix=testing - -echo "Time taken for ${0##*/} is $SECONDS secs" diff --git a/tools/travis/setupLeanSystem.sh b/tools/travis/setupLeanSystem.sh deleted file mode 100755 index cfb20c73ad2..00000000000 --- a/tools/travis/setupLeanSystem.sh +++ /dev/null @@ -1,34 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -set -e - -# Build script for Travis-CI. -SECONDS=0 -SCRIPTDIR=$(cd $(dirname "$0") && pwd) -ROOTDIR="$SCRIPTDIR/../.." -RUNTIMES_MANIFEST=${1:-"/ansible/files/runtimes.json"} - -cd $ROOTDIR/ansible - -#$ANSIBLE_CMD openwhisk-lean.yml -e manifest_file="$RUNTIMES_MANIFEST" -$ANSIBLE_CMD openwhisk-lean.yml -e manifest_file="$RUNTIMES_MANIFEST" -$ANSIBLE_CMD apigateway.yml -$ANSIBLE_CMD routemgmt.yml - -echo "Time taken for ${0##*/} is $SECONDS secs" diff --git a/tools/vagrant/README.md b/tools/vagrant/README.md index ea9e5244a4d..63401322d6a 100644 --- a/tools/vagrant/README.md +++ b/tools/vagrant/README.md @@ -307,12 +307,3 @@ Ignore error message `Sub-process /usr/bin/dpkg returned an error code (1)` when creating Vagrant VM using `gui-true`. Remember to use `gui=true` every time you do `vagrant reload`. Or, you can enable the GUI directly by editing the Vagrant file. - -## Lean Setup -To have a lean setup (no Kafka, Zookeeper and no Invokers as separate entities) - -Set environment variable LEAN to true before creating vagrant VM -``` -export LEAN=true -``` - diff --git a/tools/vagrant/Vagrantfile b/tools/vagrant/Vagrantfile index 39fc9b901a1..2466e352384 100644 --- a/tools/vagrant/Vagrantfile +++ b/tools/vagrant/Vagrantfile @@ -149,16 +149,7 @@ Vagrant.configure('2') do |config| echo "`date`: deploy-start" >> /tmp/vagrant-times.txt cd ${ANSIBLE_HOME} su vagrant -c 'ansible-playbook -i environments/vagrant wipe.yml' - - export LEAN=#{ENV['LEAN']} || "false" - if [[ $LEAN == "true" ]]; then - # Deploy Lean Openwhisk (consolidated controller + invoker without kafka, zookeeper etc.) - su vagrant -c 'ansible-playbook -i environments/vagrant openwhisk-lean.yml -e invoker_use_runc=False -e controller_akka_provider=local' - else - # Deploy full Openwhisk stack - su vagrant -c 'ansible-playbook -i environments/vagrant openwhisk.yml -e invoker_use_runc=False' - fi - + su vagrant -c 'ansible-playbook -i environments/vagrant openwhisk.yml -e invoker_use_runc=False' su vagrant -c 'ansible-playbook -i environments/vagrant postdeploy.yml' su vagrant -c 'ansible-playbook -i environments/vagrant apigateway.yml' su vagrant -c 'ansible-playbook -i environments/vagrant routemgmt.yml'