Skip to content

Commit

Permalink
Merge pull request #91 from bketelsen/master
Browse files Browse the repository at this point in the history
Add support for polling a Kubernetes Master server
  • Loading branch information
bketelsen committed Oct 16, 2014
2 parents e740804 + 516d273 commit 30201d7
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 10 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ install:
- go get github.com/rcrowley/go-metrics
- go get github.com/rcrowley/go-metrics/influxdb
- go get github.com/rcrowley/go-metrics/stathat
- go get github.com/GoogleCloudPlatform/kubernetes/pkg/client
- go get github.com/GoogleCloudPlatform/kubernetes/pkg/api
- go get github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config
- go get github.com/GoogleCloudPlatform/kubernetes/pkg/util

before_script:
- go build -o $HOME/gopath/src/github.com/coreos/etcd/etcd.run github.com/coreos/etcd
Expand Down
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,37 @@ in the etcd backend so a restart of SkyDNS with the same unique value will give

;; ANSWER SECTION:
local.dns.skydns.local. 3600 IN A 192.0.2.1
## Kubernetes
SkyDNS now has primitive support for watching the API of a Kubernetes master and
inserting DNS records to represent the services running in a Kubernetes cluster.

The service name in Kubernetes will be registered as a host (A) record under the SkyDNS
domain. For example, if you use the default `skydns.local` configuration, a service called
`redismaster` will be available at `redismaster.skydns.local`. Additionally, SRV records
are created for each service that is registered, so queries for SRV records will return all
information necessary to connect to your service:

```
;; ANSWER SECTION:
redismaster.skydns.local. 30 IN SRV 10 100 10000 10.0.2.17
```
In the query above, you can see the IP address, the weight and the port have been set
by SkyDNS.

Kubernets support is experimental and will improve with time. To enable it, start SkyDNS
with the `-kubernetes` flag and the client configuration parameters that you would use to connect
to an APIServer instance. At a minimum you need to pass the -master flag. A common example to
start a SkyDNS server:

```
sudo skydns -kubernetes -domain kubernetes.local. -master="http://127.0.0.1:8080"
```
This command starts a SkyDNS service listening on port 53/udp, connecting to the
Kubernetes APIServer on localhost, and serving the domain `kubernetes.local`, meaning all
services in Kubernetes will be resolved in the form `servicename.kubernetes.local`

For questions on SkyDNS/Kubernetes integration please see the #google-containers channel
on freenode, or open tickets in the SkyDNS repository.

# FAQ

Expand Down
181 changes: 181 additions & 0 deletions kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package main

import (
"flag"
"log"
"net"
"sync"
"time"

"encoding/json"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
pconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/skynetservices/skydns/msg"
)

// The periodic interval for checking the state of things.
const syncInterval = 5 * time.Second

type KubernetesSync struct {
mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo
eclient *etcd.Client
}

func NewKubernetesSync(client *etcd.Client) *KubernetesSync {
ks := &KubernetesSync{
serviceMap: make(map[string]*serviceInfo),
eclient: client,
}
return ks
}

// This is a belt-and-suspenders loop that periodically
// addes the records in the local cache of Kubernetes
// services to the skydns repository to prevent them
// from expiring.
func (ksync *KubernetesSync) SyncLoop() {
for {
select {
case <-time.After(syncInterval):
log.Println("Periodic sync")
ksync.ensureDNS()
}
}
}

// Ensure that dns records exist for all services.
// This seems a bit redundant. TBD - remove?
func (ksync *KubernetesSync) ensureDNS() {
ksync.mu.Lock()
defer ksync.mu.Unlock()
for name, info := range ksync.serviceMap {
err := ksync.addDNS(name, info)
if err != nil {
log.Println("Failed to ensure dns for %q: %s", name, err)
}
}
}

// OnUpdate manages the active set of service records.
// Active service records get ttl bumps if found in the update set or
// removed if missing from the update set.
func (ksync *KubernetesSync) OnUpdate(services []api.Service) {
activeServices := util.StringSet{}
for _, service := range services {
activeServices.Insert(service.ID)
info, exists := ksync.getServiceInfo(service.ID)
serviceIP := net.ParseIP(service.PortalIP)
if exists && (info.portalPort != service.Port || !info.portalIP.Equal(serviceIP)) {
err := ksync.removeDNS(service.ID, info)
if err != nil {
log.Printf("Failed to remove dns for %q: %s\n", service.ID, err)
}
}
log.Printf("Adding new service %q at %s:%d/%s (local :%d)\n", service.ID, serviceIP, service.Port, service.Protocol, service.ProxyPort)
si := &serviceInfo{
proxyPort: service.ProxyPort,
protocol: service.Protocol,
active: true,
}
ksync.setServiceInfo(service.ID, si)
si.portalIP = serviceIP
si.portalPort = service.Port
err := ksync.addDNS(service.ID, si)
if err != nil {
log.Println("Failed to add dns %q: %s", service.ID, err)
}
}
ksync.mu.Lock()
defer ksync.mu.Unlock()
for name, info := range ksync.serviceMap {
if !activeServices.Has(name) {
err := ksync.removeDNS(name, info)
if err != nil {
log.Println("Failed to remove dns for %q: %s", name, err)
}
delete(ksync.serviceMap, name)
}
}
}

func (ksync *KubernetesSync) getServiceInfo(service string) (*serviceInfo, bool) {
ksync.mu.Lock()
defer ksync.mu.Unlock()
info, ok := ksync.serviceMap[service]
return info, ok
}

func (ksync *KubernetesSync) setServiceInfo(service string, info *serviceInfo) {
ksync.mu.Lock()
defer ksync.mu.Unlock()
ksync.serviceMap[service] = info
}

func (ksync *KubernetesSync) removeDNS(service string, info *serviceInfo) error {
record := service + "." + config.Domain
// Remove from SkyDNS registration
log.Printf("removing %s from DNS", record)
_, err := ksync.eclient.Delete(msg.Path(record), true)
return err
}

func (ksync *KubernetesSync) addDNS(service string, info *serviceInfo) error {
// ADD to SkyDNS registry
svc := msg.Service{
Host: info.portalIP.String(),
Port: info.portalPort,
Priority: 10,
Weight: 10,
Ttl: 30,
}
b, err := json.Marshal(svc)
record := service + "." + config.Domain
//Set with no TTL, and hope that kubernetes events are accurate.
//TODO(BJK) Think this through a little more

log.Printf("setting dns record: %v\n", record)
_, err = ksync.eclient.Set(msg.Path(record), string(b), uint64(0))
return err
}

type serviceInfo struct {
portalIP net.IP
portalPort int
protocol api.Protocol
proxyPort int
mu sync.Mutex // protects active
active bool
}

func init() {
client.BindClientConfigFlags(flag.CommandLine, clientConfig)
}

func WatchKubernetes(eclient *etcd.Client) {
serviceConfig := pconfig.NewServiceConfig()
endpointsConfig := pconfig.NewEndpointsConfig()

// define api config source
if clientConfig.Host != "" {
log.Println("Using api calls to get Kubernetes config %v", clientConfig.Host)
client, err := client.New(clientConfig)
if err != nil {
log.Fatalf("Kubernetes requested, but received invalid API configuration: %v", err)
}
pconfig.NewSourceAPI(
client,
30*time.Second,
serviceConfig.Channel("api"),
endpointsConfig.Channel("api"),
)
}
ks := NewKubernetesSync(eclient)
// Wire skydns to handle changes to services
serviceConfig.RegisterHandler(ks)
ks.SyncLoop()
}
24 changes: 15 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,24 @@ import (
"strings"
"time"

kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/coreos/go-etcd/etcd"
"github.com/miekg/dns"
)

const Version = "2.0.0h"

var (
tlskey = ""
tlspem = ""
cacert = ""
config = &Config{ReadTimeout: 0, Domain: "", DnsAddr: "", DNSSEC: ""}
nameserver = ""
machine = ""
discover = false
verbose = false
tlskey = ""
tlspem = ""
cacert = ""
config = &Config{ReadTimeout: 0, Domain: "", DnsAddr: "", DNSSEC: ""}
nameserver = ""
machine = ""
discover = false
verbose = false
kubernetes = false
clientConfig = &kclient.Config{}
)

const (
Expand Down Expand Up @@ -60,6 +63,7 @@ func init() {
flag.BoolVar(&discover, "discover", false, "discover new machines by watching /v2/_etcd/machines")
flag.BoolVar(&verbose, "verbose", false, "log queries")
flag.BoolVar(&config.Systemd, "systemd", false, "bind to socket(s) activated by systemd (ignore -addr)")
flag.BoolVar(&kubernetes, "kubernetes", false, "read endpoints from a kubernetes master")

// TTl
// Minttl
Expand Down Expand Up @@ -119,7 +123,9 @@ func main() {
}

statsCollect()

if kubernetes {
go WatchKubernetes(client)
}
if err := s.Run(); err != nil {
log.Fatal(err)
}
Expand Down
1 change: 0 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func (s *server) ServeDNS(w dns.ResponseWriter, req *dns.Msg) {
dnssec := false
tcp := false

// fuck ANY queries
if req.Question[0].Qtype == dns.TypeANY {
m.Authoritative = false
m.Rcode = dns.RcodeRefused
Expand Down

0 comments on commit 30201d7

Please sign in to comment.