Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queue): update delivery time on runtask #1676

Merged
merged 1 commit into from
Oct 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ default <T extends Execution<T>> void restart(
throw new UnsupportedOperationException();
}

default <T extends Execution<T>> void reschedule(
@Nonnull T execution) throws Exception {
throw new UnsupportedOperationException();
}

default <T extends Execution<T>> void unpause(
@Nonnull T execution) throws Exception {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.springframework.scheduling.annotation.Scheduled
import redis.clients.jedis.Jedis
import redis.clients.jedis.JedisCommands
import redis.clients.jedis.Transaction
import redis.clients.jedis.params.sortedset.ZAddParams
import redis.clients.util.Pool
import java.io.IOException
import java.nio.charset.Charset
Expand Down Expand Up @@ -110,6 +111,19 @@ class RedisQueue(
}
}

override fun reschedule(message: Message, delay: TemporalAmount) {
pool.resource.use { redis ->
val fingerprint = message.hash()
log.debug("Re-scheduling message: $message, fingerprint: $fingerprint to deliver in $delay")
val status: Long = redis.zadd(queueKey, score(delay), fingerprint, ZAddParams.zAddParams().xx())
if (status.toInt() == 1){
fire<MessageRescheduled>(message)
} else {
fire<MessageNotFound>(message)
}
}
}

@Scheduled(fixedDelayString = "\${queue.retry.frequency.ms:10000}")
override fun retry() {
pool.resource.use { redis ->
Expand Down
12 changes: 12 additions & 0 deletions orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/Message.kt
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ data class RunTask(

constructor(message: TaskLevel, taskType: Class<out Task>) :
this(message.executionType, message.executionId, message.application, message.stageId, message.taskId, taskType)

constructor(source: ExecutionLevel, stageId: String, taskId: String, taskType: Class<out Task>) :
this(source.executionType, source.executionId, source.application, stageId, taskId, taskType)
}

data class StartStage(
Expand Down Expand Up @@ -292,6 +295,15 @@ data class StartExecution(
this(source.javaClass, source.getId(), source.getApplication())
}

data class RescheduleExecution(
override val executionType: Class<out Execution<*>>,
override val executionId: String,
override val application: String
) : Message(), ExecutionLevel {
constructor(source: Execution<*>) :
this(source.javaClass, source.getId(), source.getApplication())
}

data class CompleteExecution(
override val executionType: Class<out Execution<*>>,
override val executionId: String,
Expand Down
10 changes: 10 additions & 0 deletions orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/Queue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ interface Queue {
*/
fun push(message: Message, delay: TemporalAmount): Unit

/**
* Update [message] if it exists for immediate delivery.
*/
fun reschedule(message: Message): Unit = reschedule(message, ZERO)

/**
* Update [mesasge] if it exists for delivery after [delay].
*/
fun reschedule(message: Message, delay: TemporalAmount): Unit

/**
* Check for any un-acknowledged messages that are overdue and move them back
* onto the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class QueueExecutionRunner(
override fun <T : Execution<T>> start(execution: T) =
queue.push(StartExecution(execution))

override fun <T : Execution<T>> reschedule(execution: T) {
queue.push(RescheduleExecution(execution))
}

override fun <T : Execution<T>> restart(execution: T, stageId: String) {
queue.push(RestartStage(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null)))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.q.handler

import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.Task
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.*
import org.springframework.stereotype.Component

@Component
class RescheduleExecutionHandler(
override val queue: Queue,
override val repository: ExecutionRepository
) : MessageHandler<RescheduleExecution> {

override val messageType = RescheduleExecution::class.java

@Suppress("UNCHECKED_CAST")
override fun handle(message: RescheduleExecution) {
message.withExecution { execution ->
execution
.getStages()
.filter { it.getStatus() == ExecutionStatus.RUNNING }
.forEach { stage ->
stage.getTasks()
.filter { it.status == ExecutionStatus.RUNNING }
.forEach {
queue.reschedule(RunTask(message,
stage.getId(),
it.id,
Class.forName(it.implementingClass) as Class<out Task>
))
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ class InMemoryQueue(
}
}

override fun reschedule(message: Message, delay: TemporalAmount) {
val existed = queue.removeIf { it.payload == message }
if (existed) {
queue.put(Envelope(message, clock.instant().plus(delay), clock))
}
}

@Scheduled(fixedDelayString = "\${queue.retry.frequency.ms:10000}")
override fun retry() {
val now = clock.instant()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ open class AtlasQueueMonitor
is MessageDead -> event.counter.increment()
is MessageDuplicate -> event.counter.increment()
is LockFailed -> event.counter.increment()
is MessageRescheduled -> event.counter.increment()
is MessageNotFound -> event.counter.increment()
}
}

Expand Down Expand Up @@ -155,4 +157,18 @@ open class AtlasQueueMonitor
*/
private val LockFailed.counter: Counter
get() = registry.counter("queue.lock.failed")

/**
* Count of attempted message rescheduling that succeeded (in other words,
* that message existed on the queue).
*/
private val MessageRescheduled.counter: Counter
get() = registry.counter("queue.reschedule.succeeded")

/**
* Count of attempted message rescheduling that failed (in other words,
* that message did not exist on the queue).
*/
private val MessageNotFound.counter: Counter
get() = registry.counter("queue.message.notfound")
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ inline fun <reified E : QueueEvent> MonitorableQueue.fire(message: Message? = nu
LockFailed::class -> LockFailed(this)
MessagePushed::class -> MessagePushed(this, message!!)
MessageDuplicate::class -> MessageDuplicate(this, message!!)
MessageRescheduled::class -> MessageRescheduled(this, message!!)
MessageNotFound::class -> MessageNotFound(this, message!!)
else -> throw IllegalArgumentException("Unknown event type ${E::class}")
}
publisher.publishEvent(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ class MessageRetried(source: MonitorableQueue) : QueueEvent(source)
class MessageDead(source: MonitorableQueue) : QueueEvent(source)
class MessageDuplicate(source: MonitorableQueue, payload: Message) : PayloadQueueEvent(source, payload)
class LockFailed(source: MonitorableQueue) : QueueEvent(source)

class MessageRescheduled(source: MonitorableQueue, payload: Message) : PayloadQueueEvent(source, payload)
class MessageNotFound(source: MonitorableQueue, payload: Message) : PayloadQueueEvent(source, payload)
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class TrafficShapingQueue(

override fun push(message: Message, delay: TemporalAmount) = queueImpl.push(message, delay)

override fun reschedule(message: Message, delay: TemporalAmount) = queueImpl.reschedule(message, delay)

override fun retry() {
queueImpl.retry()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.threeten.extra.Hours
import java.io.Closeable
import java.time.Clock
import java.time.Duration
import java.time.Duration.ZERO

abstract class QueueTest<out Q : Queue>(
createQueue: (Clock, DeadMessageCallback) -> Q,
Expand Down Expand Up @@ -333,6 +334,53 @@ abstract class QueueTest<out Q : Queue>(
}
}

and("the message delivery time is updated") {
val delay = Hours.of(1)

beforeGroup {
queue = createQueue(clock, deadLetterCallback).apply {
push(message, delay)
reschedule(message, ZERO)
}
}

afterGroup(::stopQueue)
afterGroup(::resetMocks)

on("polling the queue") {
queue!!.poll(callback)
}

it("delivers the message immediately and only once") {
verify(callback).invoke(eq(message), any())
}

it("does not deliver again"){
verifyNoMoreInteractions(callback)
}
}

and("the delivery time for a message that isn't on the queue isn't updated") {
val message2 = StartExecution(Pipeline::class.java, "2", "bar")

beforeGroup {
queue = createQueue(clock, deadLetterCallback).apply {
reschedule(message2, ZERO)
}
}

afterGroup(::stopQueue)
afterGroup(::resetMocks)

on("polling the queue") {
queue!!.poll(callback)
}

it("there are no messages on the queue"){
verifyNoMoreInteractions(callback)
}
}

and("a different message is pushed before acknowledging the first") {
val newMessage = message.copy(executionId = "2")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.q.handler

import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.Task
import com.netflix.spinnaker.orca.pipeline.model.Pipeline
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.*
import com.nhaarman.mockito_kotlin.*
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.lifecycle.CachingMode
import org.jetbrains.spek.subject.SubjectSpek

object RescheduleExecutionHandlerTest : SubjectSpek<RescheduleExecutionHandler>({

val queue: Queue = mock()
val repository: ExecutionRepository = mock()

subject(CachingMode.GROUP) {
RescheduleExecutionHandler(queue, repository)
}

fun resetMocks() = reset(queue, repository)

describe("reschedule an execution") {
val pipeline = pipeline {
application = "spinnaker"
status = ExecutionStatus.RUNNING
stage {
refId = "1"
status = ExecutionStatus.SUCCEEDED
}
stage {
refId = "2a"
requisiteStageRefIds = listOf("1")
status = ExecutionStatus.RUNNING
task {
id = "4"
status = ExecutionStatus.RUNNING
}
}
stage {
refId = "2b"
requisiteStageRefIds = listOf("1")
status = ExecutionStatus.RUNNING
task {
id = "5"
status = ExecutionStatus.RUNNING
}
}
stage {
refId = "3"
requisiteStageRefIds = listOf("2a", "2b")
status = ExecutionStatus.NOT_STARTED
}
}
val message = RescheduleExecution(Pipeline::class.java, pipeline.id, pipeline.application)

beforeGroup {
whenever(repository.retrievePipeline(pipeline.id)) doReturn pipeline
}

afterGroup(::resetMocks)

action("the handler receives a message") {
subject.handle(message)
}

@Suppress("UNCHECKED_CAST")
it("it updates the time for each running task") {
val stage2a = pipeline.stageByRef("2a")
val stage2b = pipeline.stageByRef("2b")
val task4 = stage2a.taskById("4")
val task5 = stage2b.taskById("5")

verify(queue).reschedule(RunTask(message, stage2a.id, task4.id, Class.forName(task4.implementingClass) as Class<out Task>))
verify(queue).reschedule(RunTask(message, stage2b.id, task5.id, Class.forName(task5.implementingClass) as Class<out Task>))
verifyNoMoreInteractions(queue)
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ class TaskController {
@ResponseStatus(HttpStatus.ACCEPTED)
void pause(@PathVariable String id) {
executionRepository.pause(id, AuthenticatedRequest.getSpinnakerUser().orElse("anonymous"))
def pipeline = executionRepository.retrievePipeline(id)
executionRunner.reschedule(pipeline)
}

@PreAuthorize("hasPermission(this.getPipeline(#id)?.application, 'APPLICATION', 'WRITE')")
Expand Down