Skip to content

Commit

Permalink
Oplogtoredis: Add Configurable Denylist to HTTP Server (#64)
Browse files Browse the repository at this point in the history
This adds a new feature- a configurable denylist.

The denylist is a list of rules. Each rule is a list of keys and a regex. 

Every incoming oplog message will be checked against every rule. The keys will be used sequentially to index into the oplog message, and if the value found is a string, it will be checked against the regex. If the regex is found for _any_ rule in the denylist, the oplog message will be skipped and will not be reported to Redis.

The denylist can be viewed by HTTP methods (GET /denylist and GET /denylist/:ruleID), and can be configured in the same manner (PUT /denylist and DELETE /denylist/:ruleID).

Default behavior is unchanged, since the denylist is empty.

This is useful when an external context knows that some oplog messages are not necessary to be transferred. The external context can inform Oplogtoredis via the denylist, and then monitor the internal state via the HTTP API.
  • Loading branch information
alex-goodisman committed Apr 26, 2024
1 parent 137014e commit 38e75cd
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 9 deletions.
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ buildGoModule {
'';

# update: set value to an empty string and run `nix build`. This will download Go, fetch the dependencies and calculates their hash.
vendorHash = "sha256-ceToA2DC1bhmg9WIeNSAfoNoU7sk9PrQqgqt5UbpivQ=";
vendorHash = "sha256-Vh7O0iMPG6nAvcyv92h5TVZS2awnR0vz75apyzJeu4c=";

nativeBuildInputs = [ installShellFiles ];
doCheck = false;
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/deckarep/golang-set v1.7.1
github.com/go-redis/redis/v7 v7.4.1
github.com/go-redis/redis/v8 v8.11.5
github.com/gorilla/websocket v1.4.2
github.com/juju/mgo/v2 v2.0.0-20210302023703-70d5d206e208
github.com/juju/replicaset v0.0.0-20210302050932-0303c8575745
Expand All @@ -29,10 +30,10 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang/protobuf v1.4.3 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gomodule/redigo v1.8.5 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/juju/clock v0.0.0-20190205081909-9c5c9712527c // indirect
github.com/juju/errors v0.0.0-20200330140219-3fe23663418f // indirect
github.com/juju/loggo v0.0.0-20200526014432-9ce3a2e09b5e // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
Expand Down
88 changes: 88 additions & 0 deletions integration-tests/acceptance/denylist_http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"bytes"
"encoding/json"
"io"
"net/http"
"os"
"reflect"
"testing"
)

func doRequest(method string, path string, t *testing.T, expectedCode int) interface{} {
req, err := http.NewRequest(method, os.Getenv("OTR_URL")+path, &bytes.Buffer{})
if err != nil {
t.Fatalf("Error creating req: %s", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := (&http.Client{}).Do(req)
if err != nil {
t.Fatalf("Error sending request: %s", err)
}

defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error eceiving response body: %s", err)
}

if resp.StatusCode != expectedCode {
t.Fatalf("Expected status code %d, but got %d.\nBody was: %s", expectedCode, resp.StatusCode, respBody)
}

if expectedCode == 200 {
var data interface{}
err = json.Unmarshal(respBody, &data)
if err != nil {
t.Fatalf("Error parsing JSON response: %s", err)
}

return data
}
return nil
}

// Test the /denylist HTTP operations
func TestDenyList(t *testing.T) {
// GET empty list of rules
data := doRequest("GET", "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{}) {
t.Fatalf("Expected empty list from blank GET, but got %#v", data)
}
// PUT new rule
doRequest("PUT", "/denylist/abc", t, 201)
// GET list with new rule in it
data = doRequest("GET", "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{"abc"}) {
t.Fatalf("Expected singleton from GET, but got %#v", data)
}
// GET existing rule
data = doRequest("GET", "/denylist/abc", t, 200)
if !reflect.DeepEqual(data, "abc") {
t.Fatalf("Expected matched body from GET, but got %#v", data)
}
// PUT second rule
doRequest("PUT", "/denylist/def", t, 201)
// GET second rule
data = doRequest("GET", "/denylist/def", t, 200)
if !reflect.DeepEqual(data, "def") {
t.Fatalf("Expected matched body from GET, but got %#v", data)
}
// GET list with both rules
data = doRequest("GET", "/denylist", t, 200)
// check both permutations, in case the server reordered them
if !reflect.DeepEqual(data, []interface{}{"abc", "def"}) && !reflect.DeepEqual(data, []interface{}{"def", "abc"}) {
t.Fatalf("Expected doubleton from GET, but got %#v", data)
}
// DELETE first rule
doRequest("DELETE", "/denylist/abc", t, 204)
// GET first rule
doRequest("GET", "/denylist/abc", t, 404)
// GET list with only second rule
data = doRequest("GET", "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{"def"}) {
t.Fatalf("Expected singleton from GET, but got %#V", data)
}
}
72 changes: 72 additions & 0 deletions integration-tests/acceptance/denylist_oplog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"context"
"testing"

"github.com/tulip/oplogtoredis/integration-tests/helpers"
"go.mongodb.org/mongo-driver/bson"
)

func TestDenyOplog(t *testing.T) {
harness := startHarness()
defer harness.stop()

_, err := harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id1",
"f": "1",
})
if err != nil {
panic(err)
}

expectedMessage1 := helpers.OTRMessage{
Event: "i",
Document: map[string]interface{}{
"_id": "id1",
},
Fields: []string{"_id", "f"},
}

harness.verify(t, map[string][]helpers.OTRMessage{
"tests.Foo": {expectedMessage1},
"tests.Foo::id1": {expectedMessage1},
})

doRequest("PUT", "/denylist/tests", t, 201)

_, err = harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id2",
"g": "2",
})
if err != nil {
panic(err)
}

// second message should not have been received, since it got denied
harness.verify(t, map[string][]helpers.OTRMessage{})

doRequest("DELETE", "/denylist/tests", t, 204)

_, err = harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id3",
"h": "3",
})
if err != nil {
panic(err)
}

expectedMessage3 := helpers.OTRMessage{
Event: "i",
Document: map[string]interface{}{
"_id": "id3",
},
Fields: []string{"_id", "h"},
}

// back to normal now that the deny rule is gone
harness.verify(t, map[string][]helpers.OTRMessage{
"tests.Foo": {expectedMessage3},
"tests.Foo::id3": {expectedMessage3},
})
}
127 changes: 127 additions & 0 deletions lib/denylist/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package denylist

import (
"encoding/json"
"net/http"
"strings"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/tulip/oplogtoredis/lib/log"
)

var metricFilterEnabled = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "otr",
Subsystem: "denylist",
Name: "filter_enabled",
Help: "Gauge indicating whether the denylist filter is enabled for a particular DB name",
}, []string{"db"})

// CollectionEndpoint serves the endpoints for the whole Denylist at /denylist
func CollectionEndpoint(denylist *sync.Map) func(http.ResponseWriter, *http.Request) {
return func(response http.ResponseWriter, request *http.Request) {
switch request.Method {
case "GET":
listDenylistKeys(response, denylist)
default:
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
}
}
}

// SingleEndpoint serves the endpoints for particular Denylist entries at /denylist/...
func SingleEndpoint(denylist *sync.Map) func(http.ResponseWriter, *http.Request) {
return func(response http.ResponseWriter, request *http.Request) {
switch request.Method {
case "GET":
getDenylistEntry(response, request, denylist)
case "PUT":
createDenylistEntry(response, request, denylist)
case "DELETE":
deleteDenylistEntry(response, request, denylist)
default:
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
}
}
}

// GET /denylist
func listDenylistKeys(response http.ResponseWriter, denylist *sync.Map) {
keys := []interface{}{}

denylist.Range(func(key interface{}, value interface{}) bool {
keys = append(keys, key)
return true
})

response.Header().Set("Content-Type", "application/json")
response.WriteHeader(http.StatusOK)
err := json.NewEncoder(response).Encode(keys)
if err != nil {
http.Error(response, "couldn't encode result", http.StatusInternalServerError)
return
}
}

// GET /denylist/...
func getDenylistEntry(response http.ResponseWriter, request *http.Request, denylist *sync.Map) {
id := request.URL.Path
if strings.Contains(id, "/") {
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}
_, exists := denylist.Load(id)
if !exists {
http.Error(response, "denylist entry not found with that id", http.StatusNotFound)
return
}

response.Header().Set("Content-Type", "application/json")
response.WriteHeader(http.StatusOK)
err := json.NewEncoder(response).Encode(id)
if err != nil {
http.Error(response, "couldn't encode result", http.StatusInternalServerError)
return
}
}

// PUT /denylist/...
func createDenylistEntry(response http.ResponseWriter, request *http.Request, denylist *sync.Map) {
id := request.URL.Path
if strings.Contains(id, "/") {
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}
_, exists := denylist.Load(id)
if exists {
response.WriteHeader(http.StatusNoContent)
return
}

denylist.Store(id, true)
log.Log.Infow("Created denylist entry", "id", id)
metricFilterEnabled.WithLabelValues(id).Set(1)

response.WriteHeader(http.StatusCreated)
}

// DELETE /denylist/...
func deleteDenylistEntry(response http.ResponseWriter, request *http.Request, denylist *sync.Map) {
id := request.URL.Path
if strings.Contains(id, "/") {
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}
_, exists := denylist.Load(id)
if !exists {
http.Error(response, "denylist entry not found with that id", http.StatusNotFound)
return
}

denylist.Delete(id)
log.Log.Infow("Deleted denylist entry", "id", id)
metricFilterEnabled.WithLabelValues(id).Set(0)

response.WriteHeader(http.StatusNoContent)
}
20 changes: 18 additions & 2 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"strings"
"sync"
"time"

"github.com/tulip/oplogtoredis/lib/config"
Expand All @@ -29,6 +30,7 @@ type Tailer struct {
RedisClients []redis.UniversalClient
RedisPrefix string
MaxCatchUp time.Duration
Denylist *sync.Map
}

// Raw oplog entry from Mongo
Expand Down Expand Up @@ -99,6 +101,13 @@ var (
Name: "last_entry_staleness_seconds",
Help: "Gauge recording the difference between this server's clock and the timestamp on the last read oplog entry.",
})

metricOplogEntriesFiltered = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "otr",
Subsystem: "oplog",
Name: "entries_filtered",
Help: "Oplog entries filtered by denylist",
}, []string{"database"})
)

func init() {
Expand Down Expand Up @@ -197,7 +206,7 @@ func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan b
continue
}

ts, pubs := tailer.unmarshalEntry(rawData)
ts, pubs := tailer.unmarshalEntry(rawData, tailer.Denylist)

if ts != nil {
lastTimestamp = *ts
Expand Down Expand Up @@ -331,7 +340,7 @@ func closeCursor(cursor *mongo.Cursor) {
//
// The timestamp of the entry is returned so that tailOnce knows the timestamp of the last entry it read, even if it
// ignored it or failed at some later step.
func (tailer *Tailer) unmarshalEntry(rawData bson.Raw) (timestamp *primitive.Timestamp, pubs []*redispub.Publication) {
func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map) (timestamp *primitive.Timestamp, pubs []*redispub.Publication) {
var result rawOplogEntry

err := bson.Unmarshal(rawData, &result)
Expand Down Expand Up @@ -363,6 +372,13 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw) (timestamp *primitive.Tim
database = entries[0].Database
}

if _, denied := denylist.Load(database); denied {
log.Log.Debugw("Skipping oplog entry", "database", database)
metricOplogEntriesFiltered.WithLabelValues(database).Add(1)

return
}

type errEntry struct {
err error
op *oplogEntry
Expand Down
Loading

0 comments on commit 38e75cd

Please sign in to comment.