Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make oidc discovery url configurable #8145

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func main() {
// Watch the observability config map and dynamically update request logs.
configMapWatcher.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(sl, atomicLevel, component))

trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())

var featureStore *feature.Store
var handler *filter.Handler

Expand All @@ -134,7 +136,6 @@ func main() {
}
handler.EventTypeCreator = autoCreate
}

})
featureStore.WatchConfigs(configMapWatcher)

Expand All @@ -154,9 +155,8 @@ func main() {
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister())
trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
handler, err = filter.NewHandler(logger, authVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), subscriptioninformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher)
handler, err = filter.NewHandler(logger, authVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), subscriptioninformer.Get(ctx), reporter, trustBundleConfigMapLister, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func main() {
logger.Fatal("Error setting up trace publishing", zap.Error(err))
}

trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())

var featureStore *feature.Store
var handler *ingress.Handler

Expand All @@ -168,9 +170,8 @@ func main() {
reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister())
trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, authVerifier, oidcTokenProvider, trustBundleConfigMapInformer, ctxFunc)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher)
handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, authVerifier, oidcTokenProvider, trustBundleConfigMapLister, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
18 changes: 13 additions & 5 deletions cmd/jobsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"net/http"
"strings"

configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"

"github.com/cloudevents/sdk-go/v2/binding"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"go.uber.org/zap"
Expand Down Expand Up @@ -70,9 +73,13 @@ func main() {

cfg := injection.ParseAndGetRESTConfigOrDie()
ctx = injection.WithConfig(ctx, cfg)
ctx = filteredFactory.WithSelectors(ctx,
eventingtls.TrustBundleLabelSelector,
)

ctx, informers := injection.Default.SetupInformers(ctx, cfg)
ctx = injection.WithConfig(ctx, cfg)

loggingConfig, err := cmdbroker.GetLoggingConfig(ctx, system.Namespace(), logging.ConfigMapName())
if err != nil {
log.Fatal("Error loading/parsing logging configuration:", err)
Expand Down Expand Up @@ -104,21 +111,22 @@ func main() {

logger.Info("Starting the JobSink Ingress")

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
logger.Info("Updated", zap.String("name", name), zap.Any("value", value))
})
trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
var h *Handler

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(configMapWatcher)

// Decorate contexts with the current state of the feature config.
ctxFunc := func(ctx context.Context) context.Context {
return logging.WithLogger(featureStore.ToContext(ctx), sl)
}

h := &Handler{
h = &Handler{
k8s: kubeclient.Get(ctx),
lister: jobsink.Get(ctx).Lister(),
withContext: ctxFunc,
authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister()),
authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher),
}

tlsConfig, err := getServerTLSConfig(ctx)
Expand Down
19 changes: 18 additions & 1 deletion pkg/apis/feature/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ const (
// This configuration is applied when there is no EventPolicy with a "to" referencing a given
// resource.
AuthorizationAllowSameNamespace Flag = "Allow-Same-Namespace"

// DefaultOIDCDiscoveryURL is the default OIDC Discovery URL used in most Kubernetes clusters.
DefaultOIDCDiscoveryBaseURL Flag = "https://kubernetes.default.svc"
)

// Flags is a map containing all the enabled/disabled flags for the experimental features.
Expand All @@ -81,6 +84,7 @@ func newDefaults() Flags {
EvenTypeAutoCreate: Disabled,
NewAPIServerFilters: Disabled,
AuthorizationDefaultMode: AuthorizationAllowSameNamespace,
OIDCDiscoveryBaseURL: DefaultOIDCDiscoveryBaseURL,
}
}

Expand Down Expand Up @@ -134,6 +138,19 @@ func (e Flags) IsAuthorizationDefaultModeSameNamespace() bool {
return e != nil && e[AuthorizationDefaultMode] == AuthorizationAllowSameNamespace
}

func (e Flags) OIDCDiscoveryBaseURL() string {
if e == nil {
return string(DefaultOIDCDiscoveryBaseURL)
}

discoveryUrl, ok := e[OIDCDiscoveryBaseURL]
if !ok {
return string(DefaultOIDCDiscoveryBaseURL)
}

return string(discoveryUrl)
}

func (e Flags) String() string {
return fmt.Sprintf("%+v", map[string]Flag(e))
}
Expand Down Expand Up @@ -183,7 +200,7 @@ func NewFlagsConfigFromMap(data map[string]string) (Flags, error) {
flags[sanitizedKey] = AuthorizationDenyAll
} else if sanitizedKey == AuthorizationDefaultMode && strings.EqualFold(v, string(AuthorizationAllowSameNamespace)) {
flags[sanitizedKey] = AuthorizationAllowSameNamespace
} else if strings.Contains(k, NodeSelectorLabel) {
} else if strings.Contains(k, NodeSelectorLabel) || sanitizedKey == OIDCDiscoveryBaseURL {
flags[sanitizedKey] = Flag(v)
} else {
flags[k] = Flag(v)
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/feature/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func TestGetFlags(t *testing.T) {
nodeSelector := flags.NodeSelector()
expectedNodeSelector := map[string]string{"testkey": "testvalue", "testkey1": "testvalue1", "testkey2": "testvalue2"}
require.Equal(t, expectedNodeSelector, nodeSelector)

require.Equal(t, flags.OIDCDiscoveryBaseURL(), "https://oidc.eks.eu-west-1.amazonaws.com/id/1")
}

func TestShouldNotOverrideDefaults(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/feature/flag_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ const (
CrossNamespaceEventLinks = "cross-namespace-event-links"
NewAPIServerFilters = "new-apiserversource-filters"
AuthorizationDefaultMode = "default-authorization-mode"
OIDCDiscoveryBaseURL = "oidc-discovery-base-url"
)
1 change: 1 addition & 0 deletions pkg/apis/feature/testdata/config-features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ data:
apiserversources-nodeselector-testkey: testvalue
apiserversources-nodeselector-testkey1: testvalue1
apiserversources-nodeselector-testkey2: testvalue2
oidc-discovery-base-url: "https://oidc.eks.eu-west-1.amazonaws.com/id/1"
96 changes: 70 additions & 26 deletions pkg/auth/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"

"go.opencensus.io/plugin/ochttp"
corev1listers "k8s.io/client-go/listers/core/v1"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/pkg/configmap"
"knative.dev/pkg/network"
"knative.dev/pkg/tracing/propagation/tracecontextb3"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"

Expand All @@ -41,15 +49,12 @@ import (
"knative.dev/pkg/logging"
)

const (
kubernetesOIDCDiscoveryBaseURL = "https://kubernetes.default.svc"
)

type Verifier struct {
logger *zap.SugaredLogger
restConfig *rest.Config
provider *oidc.Provider
eventPolicyLister v1alpha1.EventPolicyLister
logger *zap.SugaredLogger
restConfig *rest.Config
provider *oidc.Provider
eventPolicyLister v1alpha1.EventPolicyLister
trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister
}

type IDToken struct {
Expand All @@ -61,14 +66,24 @@ type IDToken struct {
AccessTokenHash string
}

func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister) *Verifier {
func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, cmw configmap.Watcher) *Verifier {
tokenHandler := &Verifier{
logger: logging.FromContext(ctx).With("component", "oidc-token-handler"),
restConfig: injection.GetConfig(ctx),
eventPolicyLister: eventPolicyLister,
logger: logging.FromContext(ctx).With("component", "oidc-token-handler"),
restConfig: injection.GetConfig(ctx),
eventPolicyLister: eventPolicyLister,
trustBundleConfigMapLister: trustBundleConfigMapLister,
}

if err := tokenHandler.initOIDCProvider(ctx); err != nil {
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if features, ok := value.(feature.Flags); ok {
if err := tokenHandler.initOIDCProvider(ctx, features); err != nil {
tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider after config update. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err))
}
}
})
featureStore.WatchConfigs(cmw)

if err := tokenHandler.initOIDCProvider(ctx, featureStore.Load()); err != nil {
tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err))
}

Expand Down Expand Up @@ -219,29 +234,33 @@ func (v *Verifier) verifyJWT(ctx context.Context, jwt, audience string) (*IDToke
}, nil
}

func (v *Verifier) initOIDCProvider(ctx context.Context) error {
discovery, err := v.getKubernetesOIDCDiscovery()
func (v *Verifier) initOIDCProvider(ctx context.Context, features feature.Flags) error {
httpClient, err := v.getHTTPClient(features)
if err != nil {
return fmt.Errorf("could not get HTTP client: %w", err)
}

discovery, err := v.getKubernetesOIDCDiscovery(features, httpClient)
if err != nil {
return fmt.Errorf("could not load Kubernetes OIDC discovery information: %w", err)
}

if discovery.Issuer != kubernetesOIDCDiscoveryBaseURL {
if discovery.Issuer != features.OIDCDiscoveryBaseURL() {
// in case we have another issuer as the api server:
ctx = oidc.InsecureIssuerURLContext(ctx, discovery.Issuer)
}

httpClient, err := v.getHTTPClientForKubeAPIServer()
if err != nil {
return fmt.Errorf("could not get HTTP client with TLS certs of API server: %w", err)
}
ctx = oidc.ClientContext(ctx, httpClient)

// get OIDC provider
v.provider, err = oidc.NewProvider(ctx, kubernetesOIDCDiscoveryBaseURL)
provider, err := oidc.NewProvider(ctx, features.OIDCDiscoveryBaseURL())
if err != nil {
return fmt.Errorf("could not get OIDC provider: %w", err)
}

// provider is valid, update it
v.provider = provider
Comment on lines +261 to +262
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we potentially update this from multiple goroutines, can we add a mutex around it?


v.logger.Debug("updated OIDC provider config", zap.Any("discovery-config", discovery))

return nil
Expand All @@ -256,13 +275,38 @@ func (v *Verifier) getHTTPClientForKubeAPIServer() (*http.Client, error) {
return client, nil
}

func (v *Verifier) getKubernetesOIDCDiscovery() (*openIDMetadata, error) {
client, err := v.getHTTPClientForKubeAPIServer()
if err != nil {
return nil, fmt.Errorf("could not get HTTP client for API server: %w", err)
func (v *Verifier) getHTTPClient(features feature.Flags) (*http.Client, error) {
if features.OIDCDiscoveryBaseURL() == "https://kubernetes.default.svc" {
return v.getHTTPClientForKubeAPIServer()
}

var base = http.DefaultTransport.(*http.Transport).Clone()

clientConfig := eventingtls.ClientConfig{
TrustBundleConfigMapLister: v.trustBundleConfigMapLister,
}

resp, err := client.Get(kubernetesOIDCDiscoveryBaseURL + "/.well-known/openid-configuration")
base.DialTLSContext = func(ctx context.Context, net, addr string) (net.Conn, error) {
tlsConfig, err := eventingtls.GetTLSClientConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("could not get tls client config: %w", err)
}
return network.DialTLSWithBackOff(ctx, net, addr, tlsConfig)
}

client := &http.Client{
// Add output tracing.
Transport: &ochttp.Transport{
Base: base,
Propagation: tracecontextb3.TraceContextEgress,
},
}

return client, nil
}

func (v *Verifier) getKubernetesOIDCDiscovery(features feature.Flags, client *http.Client) (*openIDMetadata, error) {
resp, err := client.Get(features.OIDCDiscoveryBaseURL() + "/.well-known/openid-configuration")
if err != nil {
return nil, fmt.Errorf("could not get response: %w", err)
}
Expand Down
Loading
Loading