Skip to content

Commit

Permalink
OplogToRedis: Persistent Denylist (#79)
Browse files Browse the repository at this point in the history
Persist the configurable denylist across a restart.

The service takes an optional environment variable configuration option `OTR_PG_PERSISTENCE_URL` to a postgres database.
The service will create a table `otr_denylist` with one column (a list of unique denylist entries).
The service will write to the table whenever entries are created or removed.
Since all OTR replicas should have the same denylist, this should be fine- first to add adds, and first to delete deletes.

The service will read the table contents at startup to populate the initial denylist.
This should make the service resilient to restarting after a denylist entry was added.
The HTTP methods should only respond 2xx after writing to the DB, so persistence failures will be known by the calling context.
  • Loading branch information
alex-goodisman committed May 9, 2024
1 parent 0fc3b98 commit f7a2b89
Show file tree
Hide file tree
Showing 13 changed files with 310 additions and 63 deletions.
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ buildGoModule {
'';

# update: set value to an empty string and run `nix build`. This will download Go, fetch the dependencies and calculates their hash.
vendorHash = "sha256-Vh7O0iMPG6nAvcyv92h5TVZS2awnR0vz75apyzJeu4c=";
vendorHash = "sha256-S7/phL8nEYNVeDPqGjh3OAqVB8nOmYk0XDhD7op3fa4=";

nativeBuildInputs = [ installShellFiles ];
doCheck = false;
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/juju/loggo v0.0.0-20200526014432-9ce3a2e09b5e // indirect
github.com/juju/utils/v2 v2.0.0-20200923005554-4646bfea2ef1 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ github.com/kvz/logstreamer v0.0.0-20201023134116-02d20f4338f5 h1:dkCjlgGN81ahDFt
github.com/kvz/logstreamer v0.0.0-20201023134116-02d20f4338f5/go.mod h1:8/LTPeDLaklcUjgSQBHbhBF1ibKAFxzS5o+H7USfMSA=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lunixbochs/vtclean v0.0.0-20160125035106-4fbf7632a2c6/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
github.com/masterzen/azure-sdk-for-go v3.2.0-beta.0.20161014135628-ee4f0065d00c+incompatible/go.mod h1:mf8fjOu33zCqxUjuiU3I8S1lJMyEAlH+0F2+M5xl3hE=
github.com/masterzen/simplexml v0.0.0-20160608183007-4572e39b1ab9/go.mod h1:kCEbxUJlNDEBNbdQMkPSp6yaKcRXVI6f4ddk8Riv4bc=
Expand Down
62 changes: 14 additions & 48 deletions integration-tests/acceptance/denylist_http_test.go
Original file line number Diff line number Diff line change
@@ -1,87 +1,53 @@
package main

import (
"bytes"
"encoding/json"
"io"
"net/http"
"os"
"reflect"
"testing"
)

func doRequest(method string, path string, t *testing.T, expectedCode int) interface{} {
req, err := http.NewRequest(method, os.Getenv("OTR_URL")+path, &bytes.Buffer{})
if err != nil {
t.Fatalf("Error creating req: %s", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := (&http.Client{}).Do(req)
if err != nil {
t.Fatalf("Error sending request: %s", err)
}

defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error eceiving response body: %s", err)
}

if resp.StatusCode != expectedCode {
t.Fatalf("Expected status code %d, but got %d.\nBody was: %s", expectedCode, resp.StatusCode, respBody)
}

if expectedCode == 200 {
var data interface{}
err = json.Unmarshal(respBody, &data)
if err != nil {
t.Fatalf("Error parsing JSON response: %s", err)
}

return data
}
return nil
}
"github.com/tulip/oplogtoredis/integration-tests/helpers"
)

// Test the /denylist HTTP operations
func TestDenyList(t *testing.T) {
baseURL := os.Getenv("OTR_URL")

// GET empty list of rules
data := doRequest("GET", "/denylist", t, 200)
data := helpers.DoRequest("GET", baseURL, "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{}) {
t.Fatalf("Expected empty list from blank GET, but got %#v", data)
}
// PUT new rule
doRequest("PUT", "/denylist/abc", t, 201)
helpers.DoRequest("PUT", baseURL, "/denylist/abc", t, 201)
// GET list with new rule in it
data = doRequest("GET", "/denylist", t, 200)
data = helpers.DoRequest("GET", baseURL, "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{"abc"}) {
t.Fatalf("Expected singleton from GET, but got %#v", data)
}
// GET existing rule
data = doRequest("GET", "/denylist/abc", t, 200)
data = helpers.DoRequest("GET", baseURL, "/denylist/abc", t, 200)
if !reflect.DeepEqual(data, "abc") {
t.Fatalf("Expected matched body from GET, but got %#v", data)
}
// PUT second rule
doRequest("PUT", "/denylist/def", t, 201)
helpers.DoRequest("PUT", baseURL, "/denylist/def", t, 201)
// GET second rule
data = doRequest("GET", "/denylist/def", t, 200)
data = helpers.DoRequest("GET", baseURL, "/denylist/def", t, 200)
if !reflect.DeepEqual(data, "def") {
t.Fatalf("Expected matched body from GET, but got %#v", data)
}
// GET list with both rules
data = doRequest("GET", "/denylist", t, 200)
data = helpers.DoRequest("GET", baseURL, "/denylist", t, 200)
// check both permutations, in case the server reordered them
if !reflect.DeepEqual(data, []interface{}{"abc", "def"}) && !reflect.DeepEqual(data, []interface{}{"def", "abc"}) {
t.Fatalf("Expected doubleton from GET, but got %#v", data)
}
// DELETE first rule
doRequest("DELETE", "/denylist/abc", t, 204)
helpers.DoRequest("DELETE", baseURL, "/denylist/abc", t, 204)
// GET first rule
doRequest("GET", "/denylist/abc", t, 404)
helpers.DoRequest("GET", baseURL, "/denylist/abc", t, 404)
// GET list with only second rule
data = doRequest("GET", "/denylist", t, 200)
data = helpers.DoRequest("GET", baseURL, "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{"def"}) {
t.Fatalf("Expected singleton from GET, but got %#V", data)
}
Expand Down
7 changes: 5 additions & 2 deletions integration-tests/acceptance/denylist_oplog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package main

import (
"context"
"os"
"testing"

"github.com/tulip/oplogtoredis/integration-tests/helpers"
"go.mongodb.org/mongo-driver/bson"
)

func TestDenyOplog(t *testing.T) {
baseURL := os.Getenv("OTR_URL")

harness := startHarness()
defer harness.stop()

Expand All @@ -33,7 +36,7 @@ func TestDenyOplog(t *testing.T) {
"tests.Foo::id1": {expectedMessage1},
})

doRequest("PUT", "/denylist/tests", t, 201)
helpers.DoRequest("PUT", baseURL, "/denylist/tests", t, 201)

_, err = harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id2",
Expand All @@ -46,7 +49,7 @@ func TestDenyOplog(t *testing.T) {
// second message should not have been received, since it got denied
harness.verify(t, map[string][]helpers.OTRMessage{})

doRequest("DELETE", "/denylist/tests", t, 204)
helpers.DoRequest("DELETE", baseURL, "/denylist/tests", t, 204)

_, err = harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id3",
Expand Down
1 change: 1 addition & 0 deletions integration-tests/fault-injection/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ RUN apt-get update && \
apt-get install -y \
jq \
netcat \
postgresql \
musl && \
rm -rf /var/lib/apt/lists/*

Expand Down
74 changes: 74 additions & 0 deletions integration-tests/fault-injection/denylist_persistence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package main

import (
"fmt"
"reflect"
"testing"
"time"

"github.com/tulip/oplogtoredis/integration-tests/fault-injection/harness"
"github.com/tulip/oplogtoredis/integration-tests/helpers"
)

// This test restarts oplogtoredis after adding denylist entries.
// We expect the entries to still be there afterward, and be removable.
func TestDenylistPersistence(t *testing.T) {
mongo := harness.StartMongoServer()
defer mongo.Stop()

// Sleeping here for a while as the initial connection seems to be unreliable
time.Sleep(time.Second * 1)

redis := harness.StartRedisServer()
defer redis.Stop()

pg := harness.StartPostgresServer()
defer pg.Stop()

// wait before starting OTR for the auth changes to take effects
time.Sleep(3 * time.Second)

otr := harness.StartOTRProcessWithEnv(mongo.Addr, redis.Addr, 9000, []string{
fmt.Sprintf("OTR_PG_PERSISTENCE_URL=%s", pg.ConnStr),
})
defer otr.Stop()

time.Sleep(3 * time.Second)

baseURL := "http://localhost:9000"
// PUT new rule
helpers.DoRequest("PUT", baseURL, "/denylist/abc", t, 201)
// PUT second rule
helpers.DoRequest("PUT", baseURL, "/denylist/def", t, 201)
// GET list with both rules
data := helpers.DoRequest("GET", baseURL, "/denylist", t, 200)
// check both permutations, in case the server reordered them
if !reflect.DeepEqual(data, []interface{}{"abc", "def"}) && !reflect.DeepEqual(data, []interface{}{"def", "abc"}) {
t.Fatalf("Expected doubleton from GET, but got %#v", data)
}

otr.Stop()
time.Sleep(3 * time.Second)
otr.Start()

time.Sleep(3 * time.Second)

// denylist should have persisted across the restart

// GET list with both rules
data = helpers.DoRequest("GET", baseURL, "/denylist", t, 200)
// check both permutations, in case the server reordered them
if !reflect.DeepEqual(data, []interface{}{"abc", "def"}) && !reflect.DeepEqual(data, []interface{}{"def", "abc"}) {
t.Fatalf("Expected doubleton from GET, but got %#v", data)
}

// denylist should still be modifiable

// DELETE first rule
helpers.DoRequest("DELETE", baseURL, "/denylist/abc", t, 204)
// GET list with only second rule
data = helpers.DoRequest("GET", baseURL, "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{"def"}) {
t.Fatalf("Expected singleton from GET, but got %#V", data)
}
}
53 changes: 53 additions & 0 deletions integration-tests/fault-injection/harness/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package harness

import (
"os/exec"
)

type PostgresServer struct {
ConnStr string
}

func runCommandWithLogs(name string, args ...string) {
cmd := exec.Command(name, args...)
cmd.Stderr = makeLogStreamer("postgres", "stderr")
cmd.Stdout = makeLogStreamer("postgres", "stdout")
err := cmd.Start()
if err != nil {
panic("Error starting up postgres: " + err.Error())
}
}

func StartPostgresServer() *PostgresServer {
runCommandWithLogs(
"pg_ctlcluster",
"11",
"main",
"start",
)

waitTCP("127.0.0.1:5432")

runCommandWithLogs(
"runuser",
"-u",
"postgres",
"--",
"psql",
"-c",
"ALTER USER postgres WITH PASSWORD 'postgres';",
)

return &PostgresServer{
ConnStr: "postgres://postgres:postgres@localhost/postgres",
}
}

func (server *PostgresServer) Stop() {
runCommandWithLogs(
"pg_ctlcluster",
"11",
"main",
"stop",
)
}
43 changes: 43 additions & 0 deletions integration-tests/helpers/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package helpers

import (
"bytes"
"encoding/json"
"io"
"net/http"
"testing"
)

func DoRequest(method string, baseURL string, path string, t *testing.T, expectedCode int) interface{} {
req, err := http.NewRequest(method, baseURL+path, &bytes.Buffer{})
if err != nil {
t.Fatalf("Error creating req: %s", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := (&http.Client{}).Do(req)
if err != nil {
t.Fatalf("Error sending request: %s", err)
}

defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error eceiving response body: %s", err)
}

if resp.StatusCode != expectedCode {
t.Fatalf("Expected status code %d, but got %d.\nBody was: %s", expectedCode, resp.StatusCode, respBody)
}

if expectedCode == 200 {
var data interface{}
err = json.Unmarshal(respBody, &data)
if err != nil {
t.Fatalf("Error parsing JSON response: %s", err)
}

return data
}
return nil
}
7 changes: 7 additions & 0 deletions lib/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type oplogtoredisConfiguration struct {
OplogV2ExtractSubfieldChanges bool `default:"false" envconfig:"OPLOG_V2_EXTRACT_SUBFIELD_CHANGES"`
WriteParallelism int `default:"1" split_words:"true"`
ReadParallelism int `default:"1" split_words:"true"`
PostgresPersistenceURL string `default:"" envconfig:"PG_PERSISTENCE_URL"`
}

var globalConfig *oplogtoredisConfiguration
Expand Down Expand Up @@ -149,6 +150,12 @@ func ReadParallelism() int {
return globalConfig.ReadParallelism
}

// PostgresPersistenceURL is the optional configuration for persisting a denylist entry to a postgres database
// If configured, the denylist will be written to the DB on every change, and loaded on startup
func PostgresPersistenceURL() string {
return globalConfig.PostgresPersistenceURL
}

// ParseEnv parses the current environment variables and updates the stored
// configuration. It is *not* threadsafe, and should just be called once
// at the start of the program.
Expand Down
Loading

0 comments on commit f7a2b89

Please sign in to comment.