Skip to content

Commit

Permalink
Prevented fatal crashes when k8s services api is unavailable; updated…
Browse files Browse the repository at this point in the history
… to go 1.20; addressed linter issues; added info to logging (#3)
  • Loading branch information
jvyelle committed Jul 28, 2023
1 parent 3c9e1ac commit ec1180b
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 41 deletions.
60 changes: 37 additions & 23 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,35 +1,49 @@
module github.com/norbertvannobelen/kube-grpc

go 1.18
go 1.20

require (
google.golang.org/grpc v1.19.0
k8s.io/api v0.18.2
k8s.io/apimachinery v0.18.2
k8s.io/client-go v0.18.2
google.golang.org/grpc v1.57.0
k8s.io/api v0.27.4
k8s.io/apimachinery v0.27.4
k8s.io/client-go v0.27.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/protobuf v1.3.2 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.1 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gnostic v0.1.0 // indirect
github.com/json-iterator/go v1.1.8 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 // indirect
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9 // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7 // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/appengine v1.5.0 // indirect
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/oauth2 v0.7.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
k8s.io/klog v1.0.0 // indirect
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89 // indirect
sigs.k8s.io/structured-merge-diff/v3 v3.0.0 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
40 changes: 22 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -62,11 +63,11 @@ func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
config, err := rest.InClusterConfig()
if err != nil {
log.Fatalf("ERROR: init(): Could not get kube config in cluster. Error:" + err.Error())
log.Fatalf("(kube-grpc) FATAL: init(): Could not get kube config in cluster. Error: %v", err)
}
clientset, err = kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("ERROR: init(): Could not connect to kube cluster with config. Error:" + err.Error())
log.Fatalf("(kube-grpc) FATAL: init(): Could not connect to kube cluster with config. Error: %v", err)
}
poolManager()
}
Expand Down Expand Up @@ -105,7 +106,7 @@ func healthCheck() {
err := f.Ping(grpcConn.GrpcConnection)
if err != nil {
// Add to dirtyConnections channel:
log.Printf("INFO: healthcheck(): Failed to ping %s at ip %s",
log.Printf("(kube-grpc) INFO: healthcheck(): Failed to ping %s at ip %s",
grpcConn.serviceName, grpcConn.connectionIP)
dirtyConnections <- grpcConn
}
Expand Down Expand Up @@ -134,7 +135,7 @@ func cleanConnections() {
break
}
}
log.Printf("INFO: cleanConnections(): Pool %s after clean: %v", v.serviceName, conns)
log.Printf("(kube-grpc) INFO: cleanConnections(): Pool %s after clean: %v", v.serviceName, conns)
mutex.Unlock()
}
}
Expand Down Expand Up @@ -184,8 +185,7 @@ func Pool(serviceName string, f GrpcKubeBalancer) ([]*GrpcConnection, interface{
connectionCache[serviceName] = currentConnection
}
if currentConnection.nConnections == 0 {
var err error
err = initCurrentConnection(serviceName, currentConnection)
err := initCurrentConnection(serviceName, currentConnection)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -220,7 +220,7 @@ func initCurrentConnection(serviceName string, currentConnection *connection) er
switch err.Error() {
case "K8S interaction not possible, non-retryable":
return err
case "No connections made, retry later":
case "no connections made, retry later":
// Sleep a second (which is about a lifetime in well configured system)
time.Sleep(time.Second)
continue
Expand All @@ -238,17 +238,17 @@ func updateConnectionPool(serviceName string, currentConnection *connection, loc
// Chat with k8s for service and pod information, slow not blocking action
svc, namespace, err := getService(serviceName, clientset.CoreV1())
if err != nil {
log.Printf("ERROR: updateConnectionPool(): Problem updating pool for service %s. Error %v", serviceName, err)
log.Printf("(kube-grpc) ERROR: updateConnectionPool(): Problem updating pool for service %s. Error %v", serviceName, err)
return errors.New("K8S interaction not possible, non-retryable")
}
pods, podErr := getPodsForSvc(svc, namespace, clientset.CoreV1())
if podErr != nil {
log.Printf("ERROR: updateConnectionPool(): Problem updating pool for service %s. Can not get pods. Error %v",
log.Printf("(kube-grpc) ERROR: updateConnectionPool(): Problem updating pool for service %s. Can not get pods. Error %v",
serviceName, err)
return errors.New("K8S interaction not possible, non-retryable")
}

log.Printf("INFO: updateConnectionPool(): %d pods listed by k8s for service %s", len(pods.Items), serviceName)
log.Printf("(kube-grpc) INFO: updateConnectionPool(): %d pods listed by k8s for service %s", len(pods.Items), serviceName)

// Evict from pool
// Disconnect locking reads and eviction channel:
Expand All @@ -262,13 +262,13 @@ func updateConnectionPool(serviceName string, currentConnection *connection, loc
evict := true
for _, pod := range pods.Items {
if p.connectionIP == pod.Status.PodIP {
log.Printf("INFO: updateConnectionPool(): Not evicting %s for %s", p.connectionIP, p.serviceName)
log.Printf("(kube-grpc) INFO: updateConnectionPool(): Not evicting %s for %s", p.connectionIP, p.serviceName)
evict = false
break
}
}
if evict {
log.Printf("INFO: updateConnectionPool(): Evicting %s for %s", p.connectionIP, p.serviceName)
log.Printf("(kube-grpc) INFO: updateConnectionPool(): Evicting %s for %s", p.connectionIP, p.serviceName)
// decouple mutex
a = append(a, p)
}
Expand Down Expand Up @@ -310,9 +310,13 @@ func updateConnectionPool(serviceName string, currentConnection *connection, loc
}
portSlice := strings.Split(serviceName, ":")
if len(portSlice) < 2 {
log.Fatalf("updateConnectionPool(): No port number supplied in service as stated in README")
log.Fatalf("(kube-grpc) FATAL: updateConnectionPool(): No port number supplied in service as stated in README")
}
conn, err := grpc.Dial(pod.Status.PodIP+":"+portSlice[1], grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
// Connection could not be made, so abort, but still try next pods in list
continue
}
conn, err := grpc.Dial(pod.Status.PodIP+":"+portSlice[1], grpc.WithInsecure())
grpcConn, err := currentConnection.functions.NewGrpcClient(conn)
if err != nil {
// Connection could not be made, so abort, but still try next pods in list
Expand All @@ -329,12 +333,12 @@ func updateConnectionPool(serviceName string, currentConnection *connection, loc
conn: conn,
})
currentConnection.nConnections = len(currentConnection.grpcConnection)
log.Printf("INFO: updateConnectionPool(): Created connection for service %s in namespace %s. Connection pool status %+v",
log.Printf("(kube-grpc) INFO: updateConnectionPool(): Created connection for service %s in namespace %s. Connection pool status %+v",
serviceName, namespace, currentConnection)
}
// Connection pool update might have lead to no connections at all, return appropriate error:
if currentConnection.nConnections == 0 {
return errors.New("No connections made, retry later")
return errors.New("no connections made, retry later")
}
return nil
}
Expand All @@ -343,12 +347,12 @@ func getService(serviceName string, k8sClient typev1.CoreV1Interface) (*corev1.S
listOptions := metav1.ListOptions{}
serviceSlice := strings.Split(serviceName, ".")
if len(serviceSlice) < 2 {
return nil, "", fmt.Errorf("Service name not according to convention defined in README. Service name: %s", serviceName)
return nil, "", fmt.Errorf("service name not according to convention defined in README. Service name: %s", serviceName)
}
namespace := serviceSlice[1]
svcs, err := k8sClient.Services(namespace).List(context.Background(), listOptions)
if err != nil {
log.Fatal(err)
return nil, "", fmt.Errorf("cannot retrieve the services list via the k8s API. Error: %v", err)
}
svcComponents := strings.Split(serviceName, ".")
for _, svc := range svcs.Items {
Expand Down

0 comments on commit ec1180b

Please sign in to comment.