Skip to content

Commit

Permalink
feat(core): Add correlation ids to orchestrations (#1748)
Browse files Browse the repository at this point in the history
Allows us to repeatedly send orchestrations with the same correlation id
and only running a single one. Pre-req for keel.
  • Loading branch information
robzienert authored Nov 5, 2017
1 parent 57856b1 commit 5aa1af3
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

package com.netflix.spinnaker.orca.pipeline;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spinnaker.orca.ExecutionStatus;
import com.netflix.spinnaker.orca.pipeline.model.Execution;
Expand All @@ -28,6 +24,12 @@
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static java.lang.Boolean.parseBoolean;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
Expand All @@ -52,6 +54,11 @@ protected ExecutionLauncher(ObjectMapper objectMapper,
public T start(String configJson) throws Exception {
final T execution = parse(configJson);

final T existingExecution = checkForCorrelatedExecution(execution);
if (existingExecution != null) {
return existingExecution;
}

checkRunnable(execution);

persistExecution(execution);
Expand Down Expand Up @@ -79,6 +86,12 @@ public T start(T execution) throws Exception {
return execution;
}

protected T checkForCorrelatedExecution(T execution) {
// Correlated executions currently only supported by Orchestrations. Just lazy, and a carrot to
// refactoring out distinction between Pipeline / Orchestration.
return null;
}

protected T handleStartupFailure(T execution, Throwable failure) {
final String canceledBy = "system";
final String reason = "Failed on startup: " + failure.getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,30 @@

package com.netflix.spinnaker.orca.pipeline;

import java.io.IOException;
import java.io.Serializable;
import java.time.Clock;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spinnaker.orca.pipeline.model.Execution.AuthenticationDetails;
import com.netflix.spinnaker.orca.pipeline.model.Orchestration;
import com.netflix.spinnaker.orca.pipeline.model.Stage;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.io.Serializable;
import java.time.Clock;
import java.util.Map;

import static com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionEngine.v3;
import static java.lang.String.format;

@Component
public class OrchestrationLauncher extends ExecutionLauncher<Orchestration> {

private final static Logger log = LoggerFactory.getLogger(OrchestrationLauncher.class);

private final Clock clock;

@Autowired
Expand Down Expand Up @@ -72,6 +79,10 @@ protected Orchestration parse(String configJson) throws IOException {
orchestration.getStages().add(stage);
}

if (config.get("trigger") != null) {
orchestration.getTrigger().putAll((Map<String, Object>) config.get("trigger"));
}

orchestration.setBuildTime(clock.millis());
orchestration.setAuthentication(AuthenticationDetails.build().orElse(new AuthenticationDetails()));
orchestration.setOrigin((String) config.getOrDefault("origin", "unknown"));
Expand All @@ -82,4 +93,25 @@ protected Orchestration parse(String configJson) throws IOException {
@Override protected void persistExecution(Orchestration execution) {
executionRepository.store(execution);
}

@Override
protected Orchestration checkForCorrelatedExecution(Orchestration execution) {
if (!execution.getTrigger().containsKey("correlationId")) {
return null;
}

try {
Orchestration o = executionRepository.retrieveOrchestrationForCorrelationId(
execution.getTrigger().get("correlationId").toString()
);
log.info("Found pre-existing Orchestration by correlation id (id: " +
o.getId() + ", correlationId: " +
execution.getTrigger().get("correlationId") +
")");
return o;
} catch (ExecutionNotFoundException e) {
// Swallow
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ public void setOrigin(@Nullable String origin) {
this.origin = origin;
}

private final Map<String, Object> trigger = new HashMap<>();

public @Nonnull Map<String, Object> getTrigger() {
return trigger;
}

@Nullable
public Stage<T> namedStage(String type) {
return stages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@

package com.netflix.spinnaker.orca.pipeline.model;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.netflix.spectator.api.Registry;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.netflix.spectator.api.Registry;

public class Pipeline extends Execution<Pipeline> {

Expand All @@ -50,12 +51,6 @@ public void setPipelineConfigId(@Nullable String pipelineConfigId) {
this.pipelineConfigId = pipelineConfigId;
}

private final Map<String, Object> trigger = new HashMap<>();

public @Nonnull Map<String, Object> getTrigger() {
return trigger;
}

private final List<Map<String, Object>> notifications = new ArrayList<>();

public @Nonnull List<Map<String, Object>> getNotifications() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ void resume(
@Nonnull Observable<Orchestration> retrieveOrchestrationsForApplication(
@Nonnull String application, @Nonnull ExecutionCriteria criteria);

@Nonnull Orchestration retrieveOrchestrationForCorrelationId(
@Nonnull String correlationId) throws ExecutionNotFoundException;

class ExecutionCriteria {
public int getLimit() {
return limit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

package com.netflix.spinnaker.orca.pipeline.persistence.jedis

import java.util.concurrent.Executors
import java.util.function.Function
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.jackson.OrcaObjectMapper
import com.netflix.spinnaker.orca.pipeline.model.*
import com.netflix.spinnaker.orca.pipeline.model.AlertOnAccessMap
import com.netflix.spinnaker.orca.pipeline.model.Execution
import com.netflix.spinnaker.orca.pipeline.model.Orchestration
import com.netflix.spinnaker.orca.pipeline.model.Pipeline
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.model.SyntheticStageOwner
import com.netflix.spinnaker.orca.pipeline.model.Task
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria
Expand All @@ -41,12 +45,19 @@ import rx.Observable
import rx.Scheduler
import rx.functions.Func1
import rx.schedulers.Schedulers

import javax.annotation.Nonnull
import java.util.concurrent.Executors
import java.util.function.Function

import static com.google.common.base.Predicates.notNull
import static com.google.common.collect.Maps.filterValues
import static com.netflix.spinnaker.orca.pipeline.model.Execution.DEFAULT_EXECUTION_ENGINE
import static com.netflix.spinnaker.orca.pipeline.model.SyntheticStageOwner.STAGE_BEFORE
import static java.lang.System.currentTimeMillis
import static java.util.Collections.*
import static java.util.Collections.emptyList
import static java.util.Collections.emptyMap
import static java.util.Collections.emptySet
import static redis.clients.jedis.BinaryClient.LIST_POSITION.AFTER
import static redis.clients.jedis.BinaryClient.LIST_POSITION.BEFORE

Expand Down Expand Up @@ -509,6 +520,25 @@ class JedisExecutionRepository implements ExecutionRepository {
return currentObservable
}

@Override
Orchestration retrieveOrchestrationForCorrelationId(@Nonnull String correlationId) throws ExecutionNotFoundException {
String key = "correlation:$correlationId"
withJedis(getJedisPoolForId(key)) { Jedis correlationJedis ->
def orchestrationId = correlationJedis.get(key)

if (orchestrationId != null) {
def orchestration = withJedis(getJedisPoolForId(orchestrationId)) { Jedis jedis ->
retrieveInternal(jedis, Orchestration, orchestrationId)
}
if (!orchestration.status.isComplete()) {
return orchestration
}
correlationJedis.del(key)
}
throw new ExecutionNotFoundException("No Orchestration found for correlation ID $correlationId")
}
}

private void storeExecutionInternal(Transaction tx, Execution execution) {
def prefix = execution.getClass().simpleName.toLowerCase()

Expand All @@ -529,7 +559,8 @@ class JedisExecutionRepository implements ExecutionRepository {
paused : mapper.writeValueAsString(execution.paused),
keepWaitingPipelines: String.valueOf(execution.keepWaitingPipelines),
executionEngine : execution.executionEngine?.name() ?: DEFAULT_EXECUTION_ENGINE.name(),
origin : execution.origin?.toString()
origin : execution.origin?.toString(),
trigger : mapper.writeValueAsString(execution.trigger)
]
map.stageIndex = execution.stages.id.join(",")
// TODO: remove this and only use the list
Expand All @@ -543,12 +574,14 @@ class JedisExecutionRepository implements ExecutionRepository {
if (execution instanceof Pipeline) {
map.name = execution.name
map.pipelineConfigId = execution.pipelineConfigId
map.trigger = mapper.writeValueAsString(execution.trigger)
map.notifications = mapper.writeValueAsString(execution.notifications)
map.initialConfig = mapper.writeValueAsString(execution.initialConfig)
} else if (execution instanceof Orchestration) {
map.description = execution.description
}
if (execution.trigger.containsKey("correlationId")) {
tx.set("correlation:${execution.trigger['correlationId']}", execution.id)
}

tx.hdel(key, "config")
tx.hmset(key, filterValues(map, notNull()))
Expand Down Expand Up @@ -620,6 +653,8 @@ class JedisExecutionRepository implements ExecutionRepository {
execution.paused = map.paused ? mapper.readValue(map.paused, Execution.PausedDetails) : null
execution.keepWaitingPipelines = Boolean.parseBoolean(map.keepWaitingPipelines)
execution.origin = map.origin
execution.trigger.putAll(map.trigger ? mapper.readValue(map.trigger, Map) : [:])

try {
execution.executionEngine = map.executionEngine == null ? DEFAULT_EXECUTION_ENGINE : Execution.ExecutionEngine.valueOf(map.executionEngine)
} catch (IllegalArgumentException e) {
Expand Down Expand Up @@ -651,7 +686,6 @@ class JedisExecutionRepository implements ExecutionRepository {
if (execution instanceof Pipeline) {
execution.name = map.name
execution.pipelineConfigId = map.pipelineConfigId
execution.trigger.putAll(mapper.readValue(map.trigger, Map))
execution.notifications.addAll(mapper.readValue(map.notifications, List))
execution.initialConfig.putAll(mapper.readValue(map.initialConfig, Map))
} else if (execution instanceof Orchestration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,27 @@ abstract class ExecutionRepositoryTck<T extends ExecutionRepository> extends Spe
where:
status << ExecutionStatus.values()
}

def "should return task ref for currently running orchestration by correlation id"() {
given:
def execution = orchestration()
execution.trigger['correlationId'] = 'covfefe'
repository.store(execution)
repository.updateStatus(execution.id, RUNNING)

when:
def result = repository.retrieveOrchestrationForCorrelationId('covfefe')

then:
result.id == execution.id

when:
repository.updateStatus(execution.id, SUCCEEDED)
repository.retrieveOrchestrationForCorrelationId('covfefe')

then:
thrown(ExecutionNotFoundException)
}
}

class JedisExecutionRepositorySpec extends ExecutionRepositoryTck<JedisExecutionRepository> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package com.netflix.spinnaker.orca.controllers

import com.netflix.spinnaker.orca.pipeline.util.ArtifactResolver

import javax.servlet.http.HttpServletResponse
import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spinnaker.kork.web.exceptions.InvalidRequestException
import com.netflix.spinnaker.kork.web.exceptions.ValidationException
Expand All @@ -29,6 +26,7 @@ import com.netflix.spinnaker.orca.pipeline.OrchestrationLauncher
import com.netflix.spinnaker.orca.pipeline.PipelineLauncher
import com.netflix.spinnaker.orca.pipeline.model.Pipeline
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.pipeline.util.ArtifactResolver
import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor
import com.netflix.spinnaker.orca.webhook.service.WebhookService
import com.netflix.spinnaker.security.AuthenticatedRequest
Expand All @@ -38,6 +36,9 @@ import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestMethod
import org.springframework.web.bind.annotation.RestController

import javax.servlet.http.HttpServletResponse

import static net.logstash.logback.argument.StructuredArguments.value

@RestController
Expand Down Expand Up @@ -148,7 +149,7 @@ class OperationsController {
pipeline.trigger.parameters[it.name] = pipeline.trigger.parameters.containsKey(it.name) ? pipeline.trigger.parameters[it.name] : it.default
}
}

ArtifactResolver.resolveArtifacts(pipeline)
}

Expand Down Expand Up @@ -181,12 +182,16 @@ class OperationsController {

@RequestMapping(value = "/ops", method = RequestMethod.POST)
Map<String, String> ops(@RequestBody List<Map> input) {
startTask([application: null, name: null, appConfig: null, stages: input])
def execution = [application: null, name: null, appConfig: null, stages: input]
parsePipelineTrigger(executionRepository, buildService, execution)
startTask(execution)
}

@RequestMapping(value = "/ops", consumes = "application/context+json", method = RequestMethod.POST)
Map<String, String> ops(@RequestBody Map input) {
startTask([application: input.application, name: input.description, appConfig: input.appConfig, stages: input.job])
def execution = [application: input.application, name: input.description, appConfig: input.appConfig, stages: input.job, trigger: input.trigger ?: Collections.emptyMap()]
parsePipelineTrigger(executionRepository, buildService, execution)
startTask(execution)
}

@RequestMapping(value = "/webhooks/preconfigured")
Expand Down

0 comments on commit 5aa1af3

Please sign in to comment.