Skip to content

Commit

Permalink
feat(pubsub): Enforce expected artifacts in triggers. (#171)
Browse files Browse the repository at this point in the history
Triggers can specify expected artifacts. This change ensures that
at least one of the artifacts parsed from the message satisfies each
expected artifact specified in the Trigger.
  • Loading branch information
jtk54 committed Sep 18, 2017
1 parent e0bf692 commit 414695a
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.netflix.spinnaker.kork.artifacts.model.Artifact;
import lombok.Builder;
import lombok.ToString;
import lombok.Value;
import lombok.experimental.Wither;

import java.util.List;
import java.util.Map;

@JsonDeserialize(builder = Trigger.TriggerBuilder.class)
Expand Down Expand Up @@ -73,6 +75,7 @@ public String toString() {
String secret;
String subscriptionName;
String pubsubType;
List<Artifact> expectedArtifacts;

public Trigger atBuildNumber(final int buildNumber) {
return this.toBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,25 @@
import com.netflix.spinnaker.echo.model.trigger.PubsubEvent;
import com.netflix.spinnaker.echo.model.trigger.TriggerEvent;
import com.netflix.spinnaker.echo.pipelinetriggers.PipelineCache;
import com.netflix.spinnaker.kork.artifacts.model.Artifact;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.Observable;
import rx.functions.Action1;

import java.util.List;
import java.util.function.Function;
import java.util.function.Predicate;

/**
* Triggers pipelines in _Orca_ when a trigger-enabled pubsub message arrives.
*/
@Component
@Slf4j
public class PubsubEventMonitor extends TriggerMonitor {

public static final String PUBSUB_TRIGGER_TYPE = "pubsub";
Expand Down Expand Up @@ -96,10 +100,25 @@ protected Predicate<Trigger> matchTriggerFor(final TriggerEvent event) {
PubsubEvent pubsubEvent = (PubsubEvent) event;
MessageDescription description = pubsubEvent.getContent().getMessageDescription();

// TODO(jacobkiefer): Need to apply filters specified in triggers here.
return trigger -> trigger.getType().equalsIgnoreCase(PUBSUB_TRIGGER_TYPE)
&& trigger.getPubsubType().equalsIgnoreCase(description.getPubsubType().toString())
&& trigger.getSubscriptionName().equalsIgnoreCase(description.getSubscriptionName());
&& trigger.getSubscriptionName().equalsIgnoreCase(description.getSubscriptionName())
&& anyArtifactsMatchExpected(description.getArtifacts(), trigger);
}

private Boolean anyArtifactsMatchExpected(List<Artifact> messageArtifacts, Trigger trigger) {
List<Artifact> expectedArtifacts = trigger.getExpectedArtifacts();

if (expectedArtifacts == null || expectedArtifacts.isEmpty()) {
return true;
}

if (messageArtifacts.size() > expectedArtifacts.size()) {
log.warn("Parsed message artifacts (size {}) greater than expected artifacts (size {}), continuing trigger anyway", messageArtifacts.size(), expectedArtifacts.size());
}

Predicate<Artifact> expectedArtifactMatch = a -> trigger.getExpectedArtifacts().stream().anyMatch(e -> a.getType().equals(e.getType()) && a.getName().equals(e.getName()));
return messageArtifacts.stream().anyMatch(expectedArtifactMatch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import com.netflix.spinnaker.echo.model.Event
import com.netflix.spinnaker.echo.model.pubsub.PubsubType
import com.netflix.spinnaker.echo.pipelinetriggers.monitor.PubsubEventMonitor
import com.netflix.spinnaker.echo.test.RetrofitStubs
import com.netflix.spinnaker.kork.artifacts.model.Artifact
import rx.functions.Action1
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Subject
import spock.lang.Unroll
Expand All @@ -38,6 +40,8 @@ class PubsubEventMonitorSpec extends Specification implements RetrofitStubs {
counter(*_) >> Stub(Counter)
gauge(*_) >> Integer.valueOf(1)
}
@Shared def goodArtifacts = [new Artifact(name: 'myArtifact', type: 'artifactType')]
@Shared def badArtifacts = [new Artifact(name: 'myBadArtifact', type: 'badArtifactType')]

@Subject
def monitor = new PubsubEventMonitor(pipelineCache, subscriber, registry)
Expand All @@ -57,8 +61,11 @@ class PubsubEventMonitorSpec extends Specification implements RetrofitStubs {
})

where:
event | trigger
createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription") | enabledGooglePubsubTrigger
event | trigger
createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription", null) | enabledGooglePubsubTrigger
createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription", []) | enabledGooglePubsubTrigger
createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription", goodArtifacts) | enabledGooglePubsubTrigger.withExpectedArtifacts(goodArtifacts)
createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription", goodArtifacts) | enabledGooglePubsubTrigger // Trigger doesn't care about artifacts.
// TODO(jacobkiefer): Add Kafka cases when that is implemented.
}

Expand All @@ -78,7 +85,7 @@ class PubsubEventMonitorSpec extends Specification implements RetrofitStubs {
disabledGooglePubsubTrigger | "disabled Google pubsub trigger"

pipeline = createPipelineWith(trigger)
event = createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription")
event = createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription", [])
// TODO(jacobkiefer): Add Kafka cases when that is implemented.
}

Expand All @@ -97,7 +104,7 @@ class PubsubEventMonitorSpec extends Specification implements RetrofitStubs {
})

where:
event = createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription")
event = createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription", [])
pipeline = createPipelineWith(enabledGooglePubsubTrigger, disabledGooglePubsubTrigger)
}

Expand All @@ -113,17 +120,36 @@ class PubsubEventMonitorSpec extends Specification implements RetrofitStubs {
0 * subscriber._

where:
trigger | description
disabledGooglePubsubTrigger | "disabled Google pubsub trigger"
enabledGooglePubsubTrigger.withSubscriptionName("wrongName") | "different subscription name"
enabledGooglePubsubTrigger.withPubsubType("noogle") | "different subscription name"
trigger | description
disabledGooglePubsubTrigger | "disabled Google pubsub trigger"
enabledGooglePubsubTrigger.withSubscriptionName("wrongName") | "different subscription name"
enabledGooglePubsubTrigger.withPubsubType("noogle") | "different subscription name"

pipeline = createPipelineWith(trigger)
event = createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription")
event = createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription", [])
}

@Unroll
def "does not trigger a pipeline that has an enabled bitbucket trigger with missing #field"() {
def "does not trigger #description pipelines containing artifacts for Google pubsub"() {
given:
pipelineCache.getPipelines() >> [pipeline]

when:
monitor.processEvent(objectMapper.convertValue(event, Event))

then:
0 * subscriber._

where:
trigger | description
enabledGooglePubsubTrigger.withExpectedArtifacts(badArtifacts) | "non-matching artifact in message"

pipeline = createPipelineWith(trigger)
event = createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription", goodArtifacts)
}

@Unroll
def "does not trigger a pipeline that has an enabled pubsub trigger with missing #field"() {
given:
pipelineCache.getPipelines() >> [badPipeline, goodPipeline]

Expand All @@ -138,7 +164,7 @@ class PubsubEventMonitorSpec extends Specification implements RetrofitStubs {
enabledGooglePubsubTrigger.withSubscriptionName(null) | "subscriptionName"
enabledGooglePubsubTrigger.withPubsubType(null) | "pubsubType"

event = createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription")
event = createPubsubEvent(PubsubType.GOOGLE, "projects/project/subscriptions/subscription", [])
goodPipeline = createPipelineWith(enabledGooglePubsubTrigger)
badPipeline = createPipelineWith(trigger)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.netflix.spinnaker.echo.model.Trigger
import com.netflix.spinnaker.echo.model.pubsub.MessageDescription
import com.netflix.spinnaker.echo.model.pubsub.PubsubType
import com.netflix.spinnaker.echo.model.trigger.*
import com.netflix.spinnaker.kork.artifacts.model.Artifact
import retrofit.RetrofitError
import retrofit.client.Response

Expand Down Expand Up @@ -84,13 +85,14 @@ trait RetrofitStubs {
return res
}

PubsubEvent createPubsubEvent(PubsubType pubsubType, String subscriptionName) {
PubsubEvent createPubsubEvent(PubsubType pubsubType, String subscriptionName, List<Artifact> artifacts) {
def res = new PubsubEvent()

def description = MessageDescription.builder()
.pubsubType(pubsubType)
.ackDeadlineMillis(10000)
.subscriptionName(subscriptionName)
.artifacts(artifacts)
.build()

def content = new PubsubEvent.Content()
Expand Down

0 comments on commit 414695a

Please sign in to comment.