diff --git a/.travis.yml b/.travis.yml index 4abfdb55..6280acee 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,6 +11,10 @@ install: - go get github.com/rcrowley/go-metrics - go get github.com/rcrowley/go-metrics/influxdb - go get github.com/rcrowley/go-metrics/stathat + - go get github.com/GoogleCloudPlatform/kubernetes/pkg/client + - go get github.com/GoogleCloudPlatform/kubernetes/pkg/api + - go get github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config + - go get github.com/GoogleCloudPlatform/kubernetes/pkg/util before_script: - go build -o $HOME/gopath/src/github.com/coreos/etcd/etcd.run github.com/coreos/etcd diff --git a/README.md b/README.md index d786defe..c3d6f9ff 100644 --- a/README.md +++ b/README.md @@ -432,6 +432,37 @@ in the etcd backend so a restart of SkyDNS with the same unique value will give ;; ANSWER SECTION: local.dns.skydns.local. 3600 IN A 192.0.2.1 +## Kubernetes +SkyDNS now has primitive support for watching the API of a Kubernetes master and +inserting DNS records to represent the services running in a Kubernetes cluster. + +The service name in Kubernetes will be registered as a host (A) record under the SkyDNS +domain. For example, if you use the default `skydns.local` configuration, a service called +`redismaster` will be available at `redismaster.skydns.local`. Additionally, SRV records +are created for each service that is registered, so queries for SRV records will return all +information necessary to connect to your service: + +``` +;; ANSWER SECTION: +redismaster.skydns.local. 30 IN SRV 10 100 10000 10.0.2.17 +``` +In the query above, you can see the IP address, the weight and the port have been set +by SkyDNS. + +Kubernets support is experimental and will improve with time. To enable it, start SkyDNS +with the `-kubernetes` flag and the client configuration parameters that you would use to connect +to an APIServer instance. At a minimum you need to pass the -master flag. A common example to +start a SkyDNS server: + +``` +sudo skydns -kubernetes -domain kubernetes.local. -master="http://127.0.0.1:8080" +``` +This command starts a SkyDNS service listening on port 53/udp, connecting to the +Kubernetes APIServer on localhost, and serving the domain `kubernetes.local`, meaning all +services in Kubernetes will be resolved in the form `servicename.kubernetes.local` + +For questions on SkyDNS/Kubernetes integration please see the #google-containers channel +on freenode, or open tickets in the SkyDNS repository. # FAQ diff --git a/kubernetes.go b/kubernetes.go new file mode 100644 index 00000000..eac9f20c --- /dev/null +++ b/kubernetes.go @@ -0,0 +1,181 @@ +package main + +import ( + "flag" + "log" + "net" + "sync" + "time" + + "encoding/json" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + pconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/coreos/go-etcd/etcd" + "github.com/skynetservices/skydns/msg" +) + +// The periodic interval for checking the state of things. +const syncInterval = 5 * time.Second + +type KubernetesSync struct { + mu sync.Mutex // protects serviceMap + serviceMap map[string]*serviceInfo + eclient *etcd.Client +} + +func NewKubernetesSync(client *etcd.Client) *KubernetesSync { + ks := &KubernetesSync{ + serviceMap: make(map[string]*serviceInfo), + eclient: client, + } + return ks +} + +// This is a belt-and-suspenders loop that periodically +// addes the records in the local cache of Kubernetes +// services to the skydns repository to prevent them +// from expiring. +func (ksync *KubernetesSync) SyncLoop() { + for { + select { + case <-time.After(syncInterval): + log.Println("Periodic sync") + ksync.ensureDNS() + } + } +} + +// Ensure that dns records exist for all services. +// This seems a bit redundant. TBD - remove? +func (ksync *KubernetesSync) ensureDNS() { + ksync.mu.Lock() + defer ksync.mu.Unlock() + for name, info := range ksync.serviceMap { + err := ksync.addDNS(name, info) + if err != nil { + log.Println("Failed to ensure dns for %q: %s", name, err) + } + } +} + +// OnUpdate manages the active set of service records. +// Active service records get ttl bumps if found in the update set or +// removed if missing from the update set. +func (ksync *KubernetesSync) OnUpdate(services []api.Service) { + activeServices := util.StringSet{} + for _, service := range services { + activeServices.Insert(service.ID) + info, exists := ksync.getServiceInfo(service.ID) + serviceIP := net.ParseIP(service.PortalIP) + if exists && (info.portalPort != service.Port || !info.portalIP.Equal(serviceIP)) { + err := ksync.removeDNS(service.ID, info) + if err != nil { + log.Printf("Failed to remove dns for %q: %s\n", service.ID, err) + } + } + log.Printf("Adding new service %q at %s:%d/%s (local :%d)\n", service.ID, serviceIP, service.Port, service.Protocol, service.ProxyPort) + si := &serviceInfo{ + proxyPort: service.ProxyPort, + protocol: service.Protocol, + active: true, + } + ksync.setServiceInfo(service.ID, si) + si.portalIP = serviceIP + si.portalPort = service.Port + err := ksync.addDNS(service.ID, si) + if err != nil { + log.Println("Failed to add dns %q: %s", service.ID, err) + } + } + ksync.mu.Lock() + defer ksync.mu.Unlock() + for name, info := range ksync.serviceMap { + if !activeServices.Has(name) { + err := ksync.removeDNS(name, info) + if err != nil { + log.Println("Failed to remove dns for %q: %s", name, err) + } + delete(ksync.serviceMap, name) + } + } +} + +func (ksync *KubernetesSync) getServiceInfo(service string) (*serviceInfo, bool) { + ksync.mu.Lock() + defer ksync.mu.Unlock() + info, ok := ksync.serviceMap[service] + return info, ok +} + +func (ksync *KubernetesSync) setServiceInfo(service string, info *serviceInfo) { + ksync.mu.Lock() + defer ksync.mu.Unlock() + ksync.serviceMap[service] = info +} + +func (ksync *KubernetesSync) removeDNS(service string, info *serviceInfo) error { + record := service + "." + config.Domain + // Remove from SkyDNS registration + log.Printf("removing %s from DNS", record) + _, err := ksync.eclient.Delete(msg.Path(record), true) + return err +} + +func (ksync *KubernetesSync) addDNS(service string, info *serviceInfo) error { + // ADD to SkyDNS registry + svc := msg.Service{ + Host: info.portalIP.String(), + Port: info.portalPort, + Priority: 10, + Weight: 10, + Ttl: 30, + } + b, err := json.Marshal(svc) + record := service + "." + config.Domain + //Set with no TTL, and hope that kubernetes events are accurate. + //TODO(BJK) Think this through a little more + + log.Printf("setting dns record: %v\n", record) + _, err = ksync.eclient.Set(msg.Path(record), string(b), uint64(0)) + return err +} + +type serviceInfo struct { + portalIP net.IP + portalPort int + protocol api.Protocol + proxyPort int + mu sync.Mutex // protects active + active bool +} + +func init() { + client.BindClientConfigFlags(flag.CommandLine, clientConfig) +} + +func WatchKubernetes(eclient *etcd.Client) { + serviceConfig := pconfig.NewServiceConfig() + endpointsConfig := pconfig.NewEndpointsConfig() + + // define api config source + if clientConfig.Host != "" { + log.Println("Using api calls to get Kubernetes config %v", clientConfig.Host) + client, err := client.New(clientConfig) + if err != nil { + log.Fatalf("Kubernetes requested, but received invalid API configuration: %v", err) + } + pconfig.NewSourceAPI( + client, + 30*time.Second, + serviceConfig.Channel("api"), + endpointsConfig.Channel("api"), + ) + } + ks := NewKubernetesSync(eclient) + // Wire skydns to handle changes to services + serviceConfig.RegisterHandler(ks) + ks.SyncLoop() +} diff --git a/main.go b/main.go index 71a298a2..d49268ca 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "strings" "time" + kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/coreos/go-etcd/etcd" "github.com/miekg/dns" ) @@ -22,14 +23,16 @@ import ( const Version = "2.0.0h" var ( - tlskey = "" - tlspem = "" - cacert = "" - config = &Config{ReadTimeout: 0, Domain: "", DnsAddr: "", DNSSEC: ""} - nameserver = "" - machine = "" - discover = false - verbose = false + tlskey = "" + tlspem = "" + cacert = "" + config = &Config{ReadTimeout: 0, Domain: "", DnsAddr: "", DNSSEC: ""} + nameserver = "" + machine = "" + discover = false + verbose = false + kubernetes = false + clientConfig = &kclient.Config{} ) const ( @@ -60,6 +63,7 @@ func init() { flag.BoolVar(&discover, "discover", false, "discover new machines by watching /v2/_etcd/machines") flag.BoolVar(&verbose, "verbose", false, "log queries") flag.BoolVar(&config.Systemd, "systemd", false, "bind to socket(s) activated by systemd (ignore -addr)") + flag.BoolVar(&kubernetes, "kubernetes", false, "read endpoints from a kubernetes master") // TTl // Minttl @@ -119,7 +123,9 @@ func main() { } statsCollect() - + if kubernetes { + go WatchKubernetes(client) + } if err := s.Run(); err != nil { log.Fatal(err) } diff --git a/server.go b/server.go index 82f9835d..e2b9a86b 100644 --- a/server.go +++ b/server.go @@ -131,7 +131,6 @@ func (s *server) ServeDNS(w dns.ResponseWriter, req *dns.Msg) { dnssec := false tcp := false - // fuck ANY queries if req.Question[0].Qtype == dns.TypeANY { m.Authoritative = false m.Rcode = dns.RcodeRefused