diff --git a/orca-core/src/main/groovy/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java b/orca-core/src/main/groovy/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java index d506b08d96..5b4c8220df 100644 --- a/orca-core/src/main/groovy/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java +++ b/orca-core/src/main/groovy/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java @@ -29,6 +29,11 @@ default > void restart( throw new UnsupportedOperationException(); } + default > void reschedule( + @Nonnull T execution) throws Exception { + throw new UnsupportedOperationException(); + } + default > void unpause( @Nonnull T execution) throws Exception { throw new UnsupportedOperationException(); diff --git a/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/orca/q/redis/RedisQueue.kt b/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/orca/q/redis/RedisQueue.kt index b65289ac62..7a186d99c5 100644 --- a/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/orca/q/redis/RedisQueue.kt +++ b/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/orca/q/redis/RedisQueue.kt @@ -33,6 +33,7 @@ import org.springframework.scheduling.annotation.Scheduled import redis.clients.jedis.Jedis import redis.clients.jedis.JedisCommands import redis.clients.jedis.Transaction +import redis.clients.jedis.params.sortedset.ZAddParams import redis.clients.util.Pool import java.io.IOException import java.nio.charset.Charset @@ -110,6 +111,19 @@ class RedisQueue( } } + override fun reschedule(message: Message, delay: TemporalAmount) { + pool.resource.use { redis -> + val fingerprint = message.hash() + log.debug("Re-scheduling message: $message, fingerprint: $fingerprint to deliver in $delay") + val status: Long = redis.zadd(queueKey, score(delay), fingerprint, ZAddParams.zAddParams().xx()) + if (status.toInt() == 1){ + fire(message) + } else { + fire(message) + } + } + } + @Scheduled(fixedDelayString = "\${queue.retry.frequency.ms:10000}") override fun retry() { pool.resource.use { redis -> diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/Message.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/Message.kt index e7d9a4464b..c1036871a1 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/Message.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/Message.kt @@ -163,6 +163,9 @@ data class RunTask( constructor(message: TaskLevel, taskType: Class) : this(message.executionType, message.executionId, message.application, message.stageId, message.taskId, taskType) + + constructor(source: ExecutionLevel, stageId: String, taskId: String, taskType: Class) : + this(source.executionType, source.executionId, source.application, stageId, taskId, taskType) } data class StartStage( @@ -292,6 +295,15 @@ data class StartExecution( this(source.javaClass, source.getId(), source.getApplication()) } +data class RescheduleExecution( + override val executionType: Class>, + override val executionId: String, + override val application: String +) : Message(), ExecutionLevel { + constructor(source: Execution<*>) : + this(source.javaClass, source.getId(), source.getApplication()) +} + data class CompleteExecution( override val executionType: Class>, override val executionId: String, diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/Queue.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/Queue.kt index c1bd1fa8c1..90951258ee 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/Queue.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/Queue.kt @@ -49,6 +49,16 @@ interface Queue { */ fun push(message: Message, delay: TemporalAmount): Unit + /** + * Update [message] if it exists for immediate delivery. + */ + fun reschedule(message: Message): Unit = reschedule(message, ZERO) + + /** + * Update [mesasge] if it exists for delivery after [delay]. + */ + fun reschedule(message: Message, delay: TemporalAmount): Unit + /** * Check for any un-acknowledged messages that are overdue and move them back * onto the queue. diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt index eff0f68b9c..c2ee70df26 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt @@ -29,6 +29,10 @@ class QueueExecutionRunner( override fun > start(execution: T) = queue.push(StartExecution(execution)) + override fun > reschedule(execution: T) { + queue.push(RescheduleExecution(execution)) + } + override fun > restart(execution: T, stageId: String) { queue.push(RestartStage(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null))) } diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RescheduleExecutionHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RescheduleExecutionHandler.kt new file mode 100644 index 0000000000..2040a5c300 --- /dev/null +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RescheduleExecutionHandler.kt @@ -0,0 +1,52 @@ +/* + * Copyright 2017 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License") + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.q.handler + +import com.netflix.spinnaker.orca.ExecutionStatus +import com.netflix.spinnaker.orca.Task +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository +import com.netflix.spinnaker.orca.q.* +import org.springframework.stereotype.Component + +@Component +class RescheduleExecutionHandler( + override val queue: Queue, + override val repository: ExecutionRepository +) : MessageHandler { + + override val messageType = RescheduleExecution::class.java + + @Suppress("UNCHECKED_CAST") + override fun handle(message: RescheduleExecution) { + message.withExecution { execution -> + execution + .getStages() + .filter { it.getStatus() == ExecutionStatus.RUNNING } + .forEach { stage -> + stage.getTasks() + .filter { it.status == ExecutionStatus.RUNNING } + .forEach { + queue.reschedule(RunTask(message, + stage.getId(), + it.id, + Class.forName(it.implementingClass) as Class + )) + } + } + } + } +} diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/memory/InMemoryQueue.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/memory/InMemoryQueue.kt index 0d88040d9c..ab1ba37757 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/memory/InMemoryQueue.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/memory/InMemoryQueue.kt @@ -71,6 +71,13 @@ class InMemoryQueue( } } + override fun reschedule(message: Message, delay: TemporalAmount) { + val existed = queue.removeIf { it.payload == message } + if (existed) { + queue.put(Envelope(message, clock.instant().plus(delay), clock)) + } + } + @Scheduled(fixedDelayString = "\${queue.retry.frequency.ms:10000}") override fun retry() { val now = clock.instant() diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/metrics/AtlasQueueMonitor.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/metrics/AtlasQueueMonitor.kt index 55701e0bf4..d260f32869 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/metrics/AtlasQueueMonitor.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/metrics/AtlasQueueMonitor.kt @@ -57,6 +57,8 @@ open class AtlasQueueMonitor is MessageDead -> event.counter.increment() is MessageDuplicate -> event.counter.increment() is LockFailed -> event.counter.increment() + is MessageRescheduled -> event.counter.increment() + is MessageNotFound -> event.counter.increment() } } @@ -155,4 +157,18 @@ open class AtlasQueueMonitor */ private val LockFailed.counter: Counter get() = registry.counter("queue.lock.failed") + + /** + * Count of attempted message rescheduling that succeeded (in other words, + * that message existed on the queue). + */ + private val MessageRescheduled.counter: Counter + get() = registry.counter("queue.reschedule.succeeded") + + /** + * Count of attempted message rescheduling that failed (in other words, + * that message did not exist on the queue). + */ + private val MessageNotFound.counter: Counter + get() = registry.counter("queue.message.notfound") } diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/metrics/MonitorableQueue.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/metrics/MonitorableQueue.kt index 03f2194fc7..90fc601025 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/metrics/MonitorableQueue.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/metrics/MonitorableQueue.kt @@ -44,6 +44,8 @@ inline fun MonitorableQueue.fire(message: Message? = nu LockFailed::class -> LockFailed(this) MessagePushed::class -> MessagePushed(this, message!!) MessageDuplicate::class -> MessageDuplicate(this, message!!) + MessageRescheduled::class -> MessageRescheduled(this, message!!) + MessageNotFound::class -> MessageNotFound(this, message!!) else -> throw IllegalArgumentException("Unknown event type ${E::class}") } publisher.publishEvent(event) diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/metrics/QueueEvent.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/metrics/QueueEvent.kt index 7590dad4d0..38a6c6992f 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/metrics/QueueEvent.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/metrics/QueueEvent.kt @@ -38,4 +38,5 @@ class MessageRetried(source: MonitorableQueue) : QueueEvent(source) class MessageDead(source: MonitorableQueue) : QueueEvent(source) class MessageDuplicate(source: MonitorableQueue, payload: Message) : PayloadQueueEvent(source, payload) class LockFailed(source: MonitorableQueue) : QueueEvent(source) - +class MessageRescheduled(source: MonitorableQueue, payload: Message) : PayloadQueueEvent(source, payload) +class MessageNotFound(source: MonitorableQueue, payload: Message) : PayloadQueueEvent(source, payload) diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/trafficshaping/TrafficShapingQueue.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/trafficshaping/TrafficShapingQueue.kt index 86e6565920..a48f5150e1 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/trafficshaping/TrafficShapingQueue.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/trafficshaping/TrafficShapingQueue.kt @@ -91,6 +91,8 @@ class TrafficShapingQueue( override fun push(message: Message, delay: TemporalAmount) = queueImpl.push(message, delay) + override fun reschedule(message: Message, delay: TemporalAmount) = queueImpl.reschedule(message, delay) + override fun retry() { queueImpl.retry() } diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueTest.kt index e701fedd6a..24eaa41ad4 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueTest.kt @@ -30,6 +30,7 @@ import org.threeten.extra.Hours import java.io.Closeable import java.time.Clock import java.time.Duration +import java.time.Duration.ZERO abstract class QueueTest( createQueue: (Clock, DeadMessageCallback) -> Q, @@ -333,6 +334,53 @@ abstract class QueueTest( } } + and("the message delivery time is updated") { + val delay = Hours.of(1) + + beforeGroup { + queue = createQueue(clock, deadLetterCallback).apply { + push(message, delay) + reschedule(message, ZERO) + } + } + + afterGroup(::stopQueue) + afterGroup(::resetMocks) + + on("polling the queue") { + queue!!.poll(callback) + } + + it("delivers the message immediately and only once") { + verify(callback).invoke(eq(message), any()) + } + + it("does not deliver again"){ + verifyNoMoreInteractions(callback) + } + } + + and("the delivery time for a message that isn't on the queue isn't updated") { + val message2 = StartExecution(Pipeline::class.java, "2", "bar") + + beforeGroup { + queue = createQueue(clock, deadLetterCallback).apply { + reschedule(message2, ZERO) + } + } + + afterGroup(::stopQueue) + afterGroup(::resetMocks) + + on("polling the queue") { + queue!!.poll(callback) + } + + it("there are no messages on the queue"){ + verifyNoMoreInteractions(callback) + } + } + and("a different message is pushed before acknowledging the first") { val newMessage = message.copy(executionId = "2") diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RescheduleExecutionHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RescheduleExecutionHandlerTest.kt new file mode 100644 index 0000000000..ef7b7d43cd --- /dev/null +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RescheduleExecutionHandlerTest.kt @@ -0,0 +1,97 @@ +/* + * Copyright 2017 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License") + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.q.handler + +import com.netflix.spinnaker.orca.ExecutionStatus +import com.netflix.spinnaker.orca.Task +import com.netflix.spinnaker.orca.pipeline.model.Pipeline +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository +import com.netflix.spinnaker.orca.q.* +import com.nhaarman.mockito_kotlin.* +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.lifecycle.CachingMode +import org.jetbrains.spek.subject.SubjectSpek + +object RescheduleExecutionHandlerTest : SubjectSpek({ + + val queue: Queue = mock() + val repository: ExecutionRepository = mock() + + subject(CachingMode.GROUP) { + RescheduleExecutionHandler(queue, repository) + } + + fun resetMocks() = reset(queue, repository) + + describe("reschedule an execution") { + val pipeline = pipeline { + application = "spinnaker" + status = ExecutionStatus.RUNNING + stage { + refId = "1" + status = ExecutionStatus.SUCCEEDED + } + stage { + refId = "2a" + requisiteStageRefIds = listOf("1") + status = ExecutionStatus.RUNNING + task { + id = "4" + status = ExecutionStatus.RUNNING + } + } + stage { + refId = "2b" + requisiteStageRefIds = listOf("1") + status = ExecutionStatus.RUNNING + task { + id = "5" + status = ExecutionStatus.RUNNING + } + } + stage { + refId = "3" + requisiteStageRefIds = listOf("2a", "2b") + status = ExecutionStatus.NOT_STARTED + } + } + val message = RescheduleExecution(Pipeline::class.java, pipeline.id, pipeline.application) + + beforeGroup { + whenever(repository.retrievePipeline(pipeline.id)) doReturn pipeline + } + + afterGroup(::resetMocks) + + action("the handler receives a message") { + subject.handle(message) + } + + @Suppress("UNCHECKED_CAST") + it("it updates the time for each running task") { + val stage2a = pipeline.stageByRef("2a") + val stage2b = pipeline.stageByRef("2b") + val task4 = stage2a.taskById("4") + val task5 = stage2b.taskById("5") + + verify(queue).reschedule(RunTask(message, stage2a.id, task4.id, Class.forName(task4.implementingClass) as Class)) + verify(queue).reschedule(RunTask(message, stage2b.id, task5.id, Class.forName(task5.implementingClass) as Class)) + verifyNoMoreInteractions(queue) + } + } +}) diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index 67b8d04072..af36de5e15 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -215,6 +215,8 @@ class TaskController { @ResponseStatus(HttpStatus.ACCEPTED) void pause(@PathVariable String id) { executionRepository.pause(id, AuthenticatedRequest.getSpinnakerUser().orElse("anonymous")) + def pipeline = executionRepository.retrievePipeline(id) + executionRunner.reschedule(pipeline) } @PreAuthorize("hasPermission(this.getPipeline(#id)?.application, 'APPLICATION', 'WRITE')")