From d3b17fd63bb6ecf93b5b8a24eca7aa2549fa1fbc Mon Sep 17 00:00:00 2001 From: Brian Ketelsen Date: Sun, 5 Oct 2014 00:00:51 -0400 Subject: [PATCH 1/7] prototype. Adding, but not yet removing. Untested --- kubernetes.go | 178 ++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 26 +++++--- server.go | 1 - 3 files changed, 194 insertions(+), 11 deletions(-) create mode 100644 kubernetes.go diff --git a/kubernetes.go b/kubernetes.go new file mode 100644 index 00000000..8371a0c0 --- /dev/null +++ b/kubernetes.go @@ -0,0 +1,178 @@ +package main + +import ( + "flag" + "log" + "net" + "sync" + "time" + + "encoding/json" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + pconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/coreos/go-etcd/etcd" + "github.com/skynetservices/skydns/msg" +) + +// The periodic interval for checking the state of things. +const syncInterval = 5 * time.Second + +type KubernetesSync struct { + mu sync.Mutex // protects serviceMap + serviceMap map[string]*serviceInfo + eclient *etcd.Client +} + +func NewKubernetesSync(client *etcd.Client) *KubernetesSync { + ks := &KubernetesSync{ + serviceMap: make(map[string]*serviceInfo), + eclient: client, + } + return ks +} + +// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. +func (ksync *KubernetesSync) SyncLoop() { + for { + select { + case <-time.After(syncInterval): + log.Println("Periodic sync") + ksync.ensureDNS() + } + } +} + +// Ensure that dns records exist for all services. +func (ksync *KubernetesSync) ensureDNS() { + ksync.mu.Lock() + defer ksync.mu.Unlock() + for name, info := range ksync.serviceMap { + err := ksync.addDNS(name, info) + if err != nil { + log.Println("Failed to ensure portal for %q: %s", name, err) + } + } +} + +// OnUpdate manages the active set of service records. +// Active service records get ttl bumps if found in the update set or +// removed if missing from the update set. + +func (ksync *KubernetesSync) OnUpdate(services []api.Service) { + log.Println("Received update notice: %+v", services) + activeServices := util.StringSet{} + for _, service := range services { + activeServices.Insert(service.ID) + info, exists := ksync.getServiceInfo(service.ID) + serviceIP := net.ParseIP(service.PortalIP) + if exists && info.portalPort == service.Port && info.portalIP.Equal(serviceIP) { + //bump TTL + } + if exists && (info.portalPort != service.Port || !info.portalIP.Equal(serviceIP)) { + err := ksync.removeDNS(service.ID, info) + if err != nil { + log.Println("Failed to remove dns for %q: %s", service.ID, err) + } + } + log.Println("Adding new service %q at %s:%d/%s (local :%d)", service.ID, serviceIP, service.Port, service.Protocol, service.ProxyPort) + + si := &serviceInfo{ + proxyPort: service.ProxyPort, + protocol: service.Protocol, + active: true, + } + ksync.setServiceInfo(service.ID, si) + + info.portalIP = serviceIP + info.portalPort = service.Port + err := ksync.addDNS(service.ID, info) + if err != nil { + log.Println("Failed to add dns %q: %s", service.ID, err) + } + } + ksync.mu.Lock() + defer ksync.mu.Unlock() + for name, info := range ksync.serviceMap { + if !activeServices.Has(name) { + err := ksync.removeDNS(name, info) + if err != nil { + log.Println("Failed to remove dns for %q: %s", name, err) + } + } + } +} + +func (ksync *KubernetesSync) getServiceInfo(service string) (*serviceInfo, bool) { + ksync.mu.Lock() + defer ksync.mu.Unlock() + info, ok := ksync.serviceMap[service] + return info, ok +} + +func (ksync *KubernetesSync) setServiceInfo(service string, info *serviceInfo) { + ksync.mu.Lock() + defer ksync.mu.Unlock() + ksync.serviceMap[service] = info +} + +func (ksync *KubernetesSync) removeDNS(service string, info *serviceInfo) error { + // Remove from SkyDNS registration + return nil +} + +func (ksync *KubernetesSync) addDNS(service string, info *serviceInfo) error { + // ADD to SkyDNS registry + svc := msg.Service{ + Host: info.portalIP.String(), + Port: info.portalPort, + Priority: 10, + Weight: 10, + Ttl: 30, + } + b, err := json.Marshal(svc) + record := service + config.Domain + //Set with no TTL, and hope that kubernetes events are accurate. + //TODO(BJK) Think this through a little more + _, err = ksync.eclient.Set(msg.Path(record), string(b), uint64(0)) + return err +} + +type serviceInfo struct { + portalIP net.IP + portalPort int + protocol api.Protocol + proxyPort int + mu sync.Mutex // protects active + active bool +} + +func init() { + client.BindClientConfigFlags(flag.CommandLine, clientConfig) +} + +func WatchKubernetes(eclient *etcd.Client) { + serviceConfig := pconfig.NewServiceConfig() + endpointsConfig := pconfig.NewEndpointsConfig() + + // define api config source + if clientConfig.Host != "" { + log.Println("Using api calls to get config %v", clientConfig.Host) + client, err := client.New(clientConfig) + if err != nil { + log.Fatalf("Kubernetes requested, but received invalid API configuration: %v", err) + } + pconfig.NewSourceAPI( + client, + 30*time.Second, + serviceConfig.Channel("api"), + endpointsConfig.Channel("api"), + ) + } + ks := NewKubernetesSync(eclient) + // Wire skydns to handle changes to services + serviceConfig.RegisterHandler(ks) + ks.SyncLoop() +} diff --git a/main.go b/main.go index b388f164..60dadb8a 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "strings" "time" + kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/coreos/go-etcd/etcd" "github.com/miekg/dns" ) @@ -21,14 +22,16 @@ import ( const Version = "2.0.0g" var ( - tlskey = "" - tlspem = "" - cacert = "" - config = &Config{ReadTimeout: 0, Domain: "", DnsAddr: "", DNSSEC: ""} - nameserver = "" - machine = "" - discover = false - verbose = false + tlskey = "" + tlspem = "" + cacert = "" + config = &Config{ReadTimeout: 0, Domain: "", DnsAddr: "", DNSSEC: ""} + nameserver = "" + machine = "" + discover = false + verbose = false + kubernetes = false + clientConfig = &kclient.Config{} ) const ( @@ -59,6 +62,7 @@ func init() { flag.BoolVar(&discover, "discover", false, "discover new machines by watching /v2/_etcd/machines") flag.BoolVar(&verbose, "verbose", false, "log queries") flag.BoolVar(&config.Systemd, "systemd", false, "bind to socket(s) activated by systemd (ignore -addr)") + flag.BoolVar(&kubernetes, "kubernetes", false, "read endpoints from a kubernetes master") // TTl // Minttl @@ -108,7 +112,7 @@ func main() { s.config.log.Infof("ectd machine cluster update failed, sleeping %s", duration) time.Sleep(duration) duration *= 2 - if duration > 32 * time.Second { + if duration > 32*time.Second { duration = 32 * time.Second } } @@ -118,7 +122,9 @@ func main() { } statsCollect() - + if kubernetes { + go WatchKubernetes(client) + } if err := s.Run(); err != nil { log.Fatal(err) } diff --git a/server.go b/server.go index 82f9835d..e2b9a86b 100644 --- a/server.go +++ b/server.go @@ -131,7 +131,6 @@ func (s *server) ServeDNS(w dns.ResponseWriter, req *dns.Msg) { dnssec := false tcp := false - // fuck ANY queries if req.Question[0].Qtype == dns.TypeANY { m.Authoritative = false m.Rcode = dns.RcodeRefused From cea36fd1c7797c07bd0066b7c8290f191d774f62 Mon Sep 17 00:00:00 2001 From: Brian Ketelsen Date: Wed, 8 Oct 2014 09:23:33 -0400 Subject: [PATCH 2/7] adding and removing working, loop logic broken --- kubernetes.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/kubernetes.go b/kubernetes.go index 8371a0c0..a5744949 100644 --- a/kubernetes.go +++ b/kubernetes.go @@ -62,7 +62,7 @@ func (ksync *KubernetesSync) ensureDNS() { // removed if missing from the update set. func (ksync *KubernetesSync) OnUpdate(services []api.Service) { - log.Println("Received update notice: %+v", services) + log.Printf("Received update notice: %+v\n", services) activeServices := util.StringSet{} for _, service := range services { activeServices.Insert(service.ID) @@ -70,25 +70,24 @@ func (ksync *KubernetesSync) OnUpdate(services []api.Service) { serviceIP := net.ParseIP(service.PortalIP) if exists && info.portalPort == service.Port && info.portalIP.Equal(serviceIP) { //bump TTL + log.Println("Service exists.") } if exists && (info.portalPort != service.Port || !info.portalIP.Equal(serviceIP)) { err := ksync.removeDNS(service.ID, info) if err != nil { - log.Println("Failed to remove dns for %q: %s", service.ID, err) + log.Printf("Failed to remove dns for %q: %s\n", service.ID, err) } } - log.Println("Adding new service %q at %s:%d/%s (local :%d)", service.ID, serviceIP, service.Port, service.Protocol, service.ProxyPort) - + log.Printf("Adding new service %q at %s:%d/%s (local :%d)\n", service.ID, serviceIP, service.Port, service.Protocol, service.ProxyPort) si := &serviceInfo{ proxyPort: service.ProxyPort, protocol: service.Protocol, active: true, } ksync.setServiceInfo(service.ID, si) - - info.portalIP = serviceIP - info.portalPort = service.Port - err := ksync.addDNS(service.ID, info) + si.portalIP = serviceIP + si.portalPort = service.Port + err := ksync.addDNS(service.ID, si) if err != nil { log.Println("Failed to add dns %q: %s", service.ID, err) } @@ -119,8 +118,11 @@ func (ksync *KubernetesSync) setServiceInfo(service string, info *serviceInfo) { } func (ksync *KubernetesSync) removeDNS(service string, info *serviceInfo) error { + record := service + "." + config.Domain // Remove from SkyDNS registration - return nil + log.Printf("removing %s from DNS", record) + _, err := ksync.eclient.Delete(msg.Path(record), true) + return err } func (ksync *KubernetesSync) addDNS(service string, info *serviceInfo) error { @@ -133,9 +135,11 @@ func (ksync *KubernetesSync) addDNS(service string, info *serviceInfo) error { Ttl: 30, } b, err := json.Marshal(svc) - record := service + config.Domain + record := service + "." + config.Domain //Set with no TTL, and hope that kubernetes events are accurate. //TODO(BJK) Think this through a little more + + log.Printf("Setting dns record: %v\n", record) _, err = ksync.eclient.Set(msg.Path(record), string(b), uint64(0)) return err } From 90b05196a58f575b367749e3bb0712cb38a5a9ff Mon Sep 17 00:00:00 2001 From: Brian Ketelsen Date: Wed, 8 Oct 2014 13:30:28 -0400 Subject: [PATCH 3/7] working --- kubernetes.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kubernetes.go b/kubernetes.go index a5744949..f0441406 100644 --- a/kubernetes.go +++ b/kubernetes.go @@ -46,13 +46,14 @@ func (ksync *KubernetesSync) SyncLoop() { } // Ensure that dns records exist for all services. +// This seems a bit redundant. TBD - remove? func (ksync *KubernetesSync) ensureDNS() { ksync.mu.Lock() defer ksync.mu.Unlock() for name, info := range ksync.serviceMap { err := ksync.addDNS(name, info) if err != nil { - log.Println("Failed to ensure portal for %q: %s", name, err) + log.Println("Failed to ensure dns for %q: %s", name, err) } } } @@ -100,6 +101,7 @@ func (ksync *KubernetesSync) OnUpdate(services []api.Service) { if err != nil { log.Println("Failed to remove dns for %q: %s", name, err) } + delete(ksync.serviceMap, name) } } } @@ -163,7 +165,7 @@ func WatchKubernetes(eclient *etcd.Client) { // define api config source if clientConfig.Host != "" { - log.Println("Using api calls to get config %v", clientConfig.Host) + log.Println("Using api calls to get Kubernetes config %v", clientConfig.Host) client, err := client.New(clientConfig) if err != nil { log.Fatalf("Kubernetes requested, but received invalid API configuration: %v", err) From 96f5d291d5b6cff2905b6a100ad095094ed76c64 Mon Sep 17 00:00:00 2001 From: Brian Ketelsen Date: Thu, 16 Oct 2014 14:25:52 -0400 Subject: [PATCH 4/7] small cleanup --- kubernetes.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/kubernetes.go b/kubernetes.go index f0441406..d0a4fbfd 100644 --- a/kubernetes.go +++ b/kubernetes.go @@ -63,16 +63,11 @@ func (ksync *KubernetesSync) ensureDNS() { // removed if missing from the update set. func (ksync *KubernetesSync) OnUpdate(services []api.Service) { - log.Printf("Received update notice: %+v\n", services) activeServices := util.StringSet{} for _, service := range services { activeServices.Insert(service.ID) info, exists := ksync.getServiceInfo(service.ID) serviceIP := net.ParseIP(service.PortalIP) - if exists && info.portalPort == service.Port && info.portalIP.Equal(serviceIP) { - //bump TTL - log.Println("Service exists.") - } if exists && (info.portalPort != service.Port || !info.portalIP.Equal(serviceIP)) { err := ksync.removeDNS(service.ID, info) if err != nil { From cdd379ac1e35d1f22a108a0bb59b3f708530b91e Mon Sep 17 00:00:00 2001 From: Brian Ketelsen Date: Thu, 16 Oct 2014 14:40:11 -0400 Subject: [PATCH 5/7] documentation --- README.md | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/README.md b/README.md index 8304b4e8..edd47c73 100644 --- a/README.md +++ b/README.md @@ -432,6 +432,37 @@ in the etcd backend so a restart of SkyDNS with the same unique value will give ;; ANSWER SECTION: local.dns.skydns.local. 3600 IN A 192.0.2.1 +## Kubernetes +SkyDNS now has primitive support for watching the API of a Kubernetes master and +inserting DNS records to represent the services running in a Kubernetes cluster. + +The service name in Kubernetes will be registered as a host (A) record under the SkyDNS +domain. For example, if you use the default `skydns.local` configuration, a service called +`redismaster` will be available at `redismaster.skydns.local`. Additionally, SRV records +are created for each service that is registered, so queries for SRV records will return all +information necessary to connect to your service: + +``` +;; ANSWER SECTION: +redismaster.skydns.local. 30 IN SRV 10 100 10000 10.0.2.17 +``` +In the query above, you can see the IP address, the weight and the port have been set +by SkyDNS. + +Kubernets support is experimental and will improve with time. To enable it, start SkyDNS +with the `-kubernetes` flag and the client configuration parameters that you would use to connect +to an APIServer instance. At a minimum you need to pass the -master flag. A common example to +start a SkyDNS server: + +``` +sudo skydns -kubernetes -domain kubernetes.local. -master="http://127.0.0.1:8080" +``` +This command starts a SkyDNS service listening on port 53/udp, connecting to the +Kubernetes APIServer on localhost, and serving the domain `kubernetes.local`, meaning all +services in Kubernetes will be resolved in the form `servicename.kubernetes.local` + +For questions on SkyDNS/Kubernetes integration please see the #google-containers channel +on freenode, or open tickets in the SkyDNS repository. # FAQ From de3714fd9d7ab623dc08d19197c1d32eee296bd6 Mon Sep 17 00:00:00 2001 From: Brian Ketelsen Date: Thu, 16 Oct 2014 14:47:53 -0400 Subject: [PATCH 6/7] stupid travis --- .travis.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.travis.yml b/.travis.yml index 4abfdb55..6280acee 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,6 +11,10 @@ install: - go get github.com/rcrowley/go-metrics - go get github.com/rcrowley/go-metrics/influxdb - go get github.com/rcrowley/go-metrics/stathat + - go get github.com/GoogleCloudPlatform/kubernetes/pkg/client + - go get github.com/GoogleCloudPlatform/kubernetes/pkg/api + - go get github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config + - go get github.com/GoogleCloudPlatform/kubernetes/pkg/util before_script: - go build -o $HOME/gopath/src/github.com/coreos/etcd/etcd.run github.com/coreos/etcd From 516d273ca4d9200124d5dc610045040ec9a3909d Mon Sep 17 00:00:00 2001 From: Brian Ketelsen Date: Thu, 16 Oct 2014 15:16:43 -0400 Subject: [PATCH 7/7] resolving comments --- kubernetes.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kubernetes.go b/kubernetes.go index d0a4fbfd..eac9f20c 100644 --- a/kubernetes.go +++ b/kubernetes.go @@ -34,7 +34,10 @@ func NewKubernetesSync(client *etcd.Client) *KubernetesSync { return ks } -// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. +// This is a belt-and-suspenders loop that periodically +// addes the records in the local cache of Kubernetes +// services to the skydns repository to prevent them +// from expiring. func (ksync *KubernetesSync) SyncLoop() { for { select { @@ -61,7 +64,6 @@ func (ksync *KubernetesSync) ensureDNS() { // OnUpdate manages the active set of service records. // Active service records get ttl bumps if found in the update set or // removed if missing from the update set. - func (ksync *KubernetesSync) OnUpdate(services []api.Service) { activeServices := util.StringSet{} for _, service := range services { @@ -136,7 +138,7 @@ func (ksync *KubernetesSync) addDNS(service string, info *serviceInfo) error { //Set with no TTL, and hope that kubernetes events are accurate. //TODO(BJK) Think this through a little more - log.Printf("Setting dns record: %v\n", record) + log.Printf("setting dns record: %v\n", record) _, err = ksync.eclient.Set(msg.Path(record), string(b), uint64(0)) return err }