Skip to content

Commit

Permalink
Refactor BackendManager / BackendStorage.
Browse files Browse the repository at this point in the history
This also fixes #294
  • Loading branch information
jkh52 committed May 10, 2024
1 parent eea669e commit 224f24f
Show file tree
Hide file tree
Showing 11 changed files with 794 additions and 866 deletions.
239 changes: 71 additions & 168 deletions pkg/server/backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@ import (
"context"
"fmt"
"io"
"math/rand"
"slices"
"strings"
"sync"
"time"

"google.golang.org/grpc/metadata"
"k8s.io/klog/v2"

commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics"
client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
"sigs.k8s.io/apiserver-network-proxy/pkg/server/metrics"
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
"sigs.k8s.io/apiserver-network-proxy/proto/header"
Expand Down Expand Up @@ -61,6 +57,7 @@ func (ps ProxyStrategy) String() string {
return "defaultRoute"
}
panic(fmt.Sprintf("unhandled ProxyStrategy: %d", ps))
}

func ParseProxyStrategy(s string) (ProxyStrategy, error) {
switch s {
Expand Down Expand Up @@ -195,176 +192,105 @@ func NewBackend(conn agent.AgentService_ConnectServer) (*Backend, error) {
return &Backend{conn: conn, id: agentID, idents: agentIdentifiers}, nil
}

// BackendStorage is an interface to manage the storage of the backend
// connections, i.e., get, add and remove
type BackendStorage interface {
// addBackend adds a backend.
addBackend(identifier string, idType header.IdentifierType, backend *Backend)
// removeBackend removes a backend.
removeBackend(identifier string, idType header.IdentifierType, backend *Backend)
// NumBackends returns the number of backends.
NumBackends() int
}

// BackendManager is an interface to manage backend connections, i.e.,
// connection to the proxy agents.
type BackendManager interface {
// Backend returns a single backend.
// WARNING: the context passed to the function should be a session-scoped
// context instead of a request-scoped context, as the backend manager will
// pick a backend for every tunnel session and each tunnel session may
// contains multiple requests.
Backend(ctx context.Context) (*Backend, error)
// Backend returns a backend connection according to proxy strategies.
Backend(addr string) (*Backend, error)
// AddBackend adds a backend.
AddBackend(backend *Backend)
// RemoveBackend adds a backend.
RemoveBackend(backend *Backend)
BackendStorage
// NumBackends returns the number of backends.
NumBackends() int
ReadinessManager
}

var _ BackendManager = &DefaultBackendManager{}

// DefaultBackendManager is the default backend manager.
type DefaultBackendManager struct {
*DefaultBackendStorage
}

func (dbm *DefaultBackendManager) Backend(_ context.Context) (*Backend, error) {
klog.V(5).InfoS("Get a random backend through the DefaultBackendManager")
return dbm.DefaultBackendStorage.GetRandomBackend()
}

func (dbm *DefaultBackendManager) AddBackend(backend *Backend) {
agentID := backend.GetAgentID()
klog.V(5).InfoS("Add the agent to DefaultBackendManager", "agentID", agentID)
dbm.addBackend(agentID, header.UID, backend)
}

func (dbm *DefaultBackendManager) RemoveBackend(backend *Backend) {
agentID := backend.GetAgentID()
klog.V(5).InfoS("Remove the agent from the DefaultBackendManager", "agentID", agentID)
dbm.removeBackend(agentID, header.UID, backend)
}

// DefaultBackendStorage is the default backend storage.
type DefaultBackendStorage struct {
mu sync.RWMutex //protects the following
// A map between agentID and its grpc connections.
// For a given agent, ProxyServer prefers backends[agentID][0] to send
// traffic, because backends[agentID][1:] are more likely to be closed
// by the agent to deduplicate connections to the same server.
//
// TODO: fix documentation. This is not always agentID, e.g. in
// the case of DestHostBackendManager.
backends map[string][]*Backend
// agentID is tracked in this slice to enable randomly picking an
// agentID in the Backend() method. There is no reliable way to
// randomly pick a key from a map (in this case, the backends) in
// Golang.
agentIDs []string
random *rand.Rand
// idTypes contains the valid identifier types for this
// DefaultBackendStorage. The DefaultBackendStorage may only tolerate certain
// types of identifiers when associating to a specific BackendManager,
// e.g., when associating to the DestHostBackendManager, it can only use the
// identifiers of types, IPv4, IPv6 and Host.
idTypes []header.IdentifierType
proxyStrategies []ProxyStrategy

// All backends by agentID.
all BackendStorage
// All backends by host identifier(s). Only used with ProxyStrategyDestHost.
byHost BackendStorage
// All default-route backends, by agentID. Only used with ProxyStrategyDefaultRoute.
byDefaultRoute BackendStorage
}

// NewDefaultBackendManager returns a DefaultBackendManager.
func NewDefaultBackendManager() *DefaultBackendManager {
func NewDefaultBackendManager(proxyStrategies []ProxyStrategy) *DefaultBackendManager {
metrics.Metrics.SetBackendCount(0)
return &DefaultBackendManager{
DefaultBackendStorage: NewDefaultBackendStorage(
[]header.IdentifierType{header.UID})}
proxyStrategies: proxyStrategies,
all: NewDefaultBackendStorage(),
byHost: NewDefaultBackendStorage(),
byDefaultRoute: NewDefaultBackendStorage(),
}
}

// NewDefaultBackendStorage returns a DefaultBackendStorage
func NewDefaultBackendStorage(idTypes []header.IdentifierType) *DefaultBackendStorage {
// Set an explicit value, so that the metric is emitted even when
// no agent ever successfully connects.
metrics.Metrics.SetBackendCount(0)
return &DefaultBackendStorage{
backends: make(map[string][]*Backend),
random: rand.New(rand.NewSource(time.Now().UnixNano())),
idTypes: idTypes,
} /* #nosec G404 */
func (s *DefaultBackendManager) Backend(addr string) (*Backend, error) {
for _, strategy := range s.proxyStrategies {
var b *Backend
var e error
e = &ErrNotFound{}
switch strategy {
case ProxyStrategyDefault:
b, e = s.all.RandomBackend()
case ProxyStrategyDestHost:
b, e = s.byHost.GetBackend(addr)
case ProxyStrategyDefaultRoute:
b, e = s.byDefaultRoute.RandomBackend()
}
if e == nil {
return b, nil
}
}
return nil, &ErrNotFound{}
}

func containIDType(idTypes []header.IdentifierType, idType header.IdentifierType) bool {
return slices.Contains(idTypes, idType)
func hostIdentifiers(backend *Backend) []string {
hosts := []string{}
hosts = append(hosts, backend.GetAgentIdentifiers().IPv4...)
hosts = append(hosts, backend.GetAgentIdentifiers().IPv6...)
hosts = append(hosts, backend.GetAgentIdentifiers().Host...)
return hosts
}

// addBackend adds a backend.
func (s *DefaultBackendStorage) addBackend(identifier string, idType header.IdentifierType, backend *Backend) {
if !containIDType(s.idTypes, idType) {
klog.V(4).InfoS("fail to add backend", "backend", identifier, "error", &ErrWrongIDType{idType, s.idTypes})
return
func (s *DefaultBackendManager) AddBackend(backend *Backend) {
agentID := backend.GetAgentID()
count := s.all.AddBackend([]string{agentID}, backend)
if slices.Contains(s.proxyStrategies, ProxyStrategyDestHost) {
idents := hostIdentifiers(backend)
s.byHost.AddBackend(idents, backend)
}
klog.V(5).InfoS("Register backend for agent", "agentID", identifier)
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.backends[identifier]
if ok {
for _, b := range s.backends[identifier] {
if b == backend {
klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "agentID", identifier)
return
}
if slices.Contains(s.proxyStrategies, ProxyStrategyDefaultRoute) {
if backend.GetAgentIdentifiers().DefaultRoute {
s.byDefaultRoute.AddBackend([]string{agentID}, backend)
}
s.backends[identifier] = append(s.backends[identifier], backend)
return
}
s.backends[identifier] = []*Backend{backend}
metrics.Metrics.SetBackendCount(len(s.backends))
s.agentIDs = append(s.agentIDs, identifier)
metrics.Metrics.SetBackendCount(count)
}

// removeBackend removes a backend.
func (s *DefaultBackendStorage) removeBackend(identifier string, idType header.IdentifierType, backend *Backend) {
if !containIDType(s.idTypes, idType) {
klog.ErrorS(&ErrWrongIDType{idType, s.idTypes}, "fail to remove backend")
return
}
klog.V(5).InfoS("Remove connection for agent", "agentID", identifier)
s.mu.Lock()
defer s.mu.Unlock()
backends, ok := s.backends[identifier]
if !ok {
klog.V(1).InfoS("Cannot find agent in backends", "identifier", identifier)
return
}
var found bool
for i, b := range backends {
if b == backend {
s.backends[identifier] = append(s.backends[identifier][:i], s.backends[identifier][i+1:]...)
if i == 0 && len(s.backends[identifier]) != 0 {
klog.V(1).InfoS("This should not happen. Removed connection that is not the first connection", "agentID", identifier)
}
found = true
}
func (s *DefaultBackendManager) RemoveBackend(backend *Backend) {
agentID := backend.GetAgentID()
count := s.all.RemoveBackend([]string{agentID}, backend)
if slices.Contains(s.proxyStrategies, ProxyStrategyDestHost) {
idents := hostIdentifiers(backend)
s.byHost.RemoveBackend(idents, backend)
}
if len(s.backends[identifier]) == 0 {
delete(s.backends, identifier)
for i := range s.agentIDs {
if s.agentIDs[i] == identifier {
s.agentIDs[i] = s.agentIDs[len(s.agentIDs)-1]
s.agentIDs = s.agentIDs[:len(s.agentIDs)-1]
break
}
if slices.Contains(s.proxyStrategies, ProxyStrategyDefaultRoute) {
if backend.GetAgentIdentifiers().DefaultRoute {
s.byDefaultRoute.RemoveBackend([]string{agentID}, backend)
}
}
if !found {
klog.V(1).InfoS("Could not find connection matching identifier to remove", "agentID", identifier, "idType", idType)
}
metrics.Metrics.SetBackendCount(len(s.backends))
metrics.Metrics.SetBackendCount(count)
}

// NumBackends resturns the number of available backends
func (s *DefaultBackendStorage) NumBackends() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.backends)
func (s *DefaultBackendManager) NumBackends() int {
return s.all.NumKeys()
}

// ErrNotFound indicates that no backend can be found.
Expand All @@ -375,32 +301,9 @@ func (e *ErrNotFound) Error() string {
return "No agent available"
}

type ErrWrongIDType struct {
got header.IdentifierType
expect []header.IdentifierType
}

func (e *ErrWrongIDType) Error() string {
return fmt.Sprintf("incorrect id type: got %s, expect %s", e.got, e.expect)
}

func ignoreNotFound(err error) error {
if _, ok := err.(*ErrNotFound); ok {
return nil
}
return err
}

// GetRandomBackend returns a random backend connection from all connected agents.
func (s *DefaultBackendStorage) GetRandomBackend() (*Backend, error) {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.backends) == 0 {
return nil, &ErrNotFound{}
func (s *DefaultBackendManager) Ready() (bool, string) {
if s.NumBackends() == 0 {
return false, "no connection to any proxy agent"
}
agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))]
klog.V(5).InfoS("Pick agent as backend", "agentID", agentID)
// always return the first connection to an agent, because the agent
// will close later connections if there are multiple.
return s.backends[agentID][0], nil
return true, ""
}
Loading

0 comments on commit 224f24f

Please sign in to comment.