Skip to content

Commit

Permalink
test: verify FileUploaded is sent from events api
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdecaf committed Jun 21, 2023
1 parent 3e1b69f commit d403362
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 4 deletions.
126 changes: 126 additions & 0 deletions internal/pipeline/events_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Licensed to The Moov Authors under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. The Moov Authors licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pipeline

import (
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"testing"
"time"

"github.com/moov-io/ach"
"github.com/moov-io/achgateway/internal/events"
"github.com/moov-io/achgateway/pkg/compliance"
"github.com/moov-io/achgateway/pkg/models"
"github.com/moov-io/base"
"github.com/moov-io/base/admin"
"gocloud.dev/pubsub"

"github.com/stretchr/testify/require"
)

func TestEventsAPI_FileUploaded(t *testing.T) {
adminServer := admin.NewServer(":0")
require.NotNil(t, adminServer)
go adminServer.Listen()
defer adminServer.Shutdown()

fr := testFileReceiver(t)
fr.RegisterAdminRoutes(adminServer)

// Write a file that's produced
agg, exists := fr.shardAggregators["testing"]
require.True(t, exists)
require.NotNil(t, agg)

m, ok := agg.merger.(*filesystemMerging)
require.True(t, ok)
require.NotNil(t, m)

file, err := ach.ReadFile(filepath.Join("..", "..", "testdata", "ppd-debit.ach"))
require.NoError(t, err)

fileID := base.ID()
bs, err := compliance.Protect(nil, models.Event{
Event: models.QueueACHFile{
FileID: fileID,
ShardKey: "testing",
File: file,
},
})
require.NoError(t, err)

err = fr.Publisher.Send(context.Background(), &pubsub.Message{
Body: bs,
})
require.NoError(t, err)

// Verify the file is pending on disk
require.Eventually(t, func() bool {
found, _ := os.ReadDir(filepath.Join(fr.MergingDir, "mergable", "testing"))
return len(found) > 0
}, 5*time.Second, 100*time.Millisecond)

// Isolate the directory (on upload)
fr.TriggerCutoff(t)

var found []os.DirEntry
require.Eventually(t, func() bool {
found, err = os.ReadDir(fr.MergingDir)
require.NoError(t, err)
return len(found) > 1
}, 5*time.Second, 100*time.Millisecond)

var uploadDir string
for i := range found {
if found[i].Name() != "mergable" {
uploadDir = found[i].Name()
}
}
require.NotEmpty(t, uploadDir)

// Reproduce FileUploaded event
address := fmt.Sprintf("http://%s/shards/testing/pipeline/%s/file-uploaded", adminServer.BindAddr(), uploadDir)
req, err := http.NewRequest("PUT", address, nil)
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)

// Verify that FileUploaded was produced
emitter, ok := agg.eventEmitter.(*events.MockEmitter)
require.True(t, ok)
require.Eventually(t, func() bool {
return len(emitter.Sent()) == 2
}, 5*time.Second, 100*time.Millisecond)

// Check fields of the FileUploaded events
sentEvents := emitter.Sent()
for i := range sentEvents {
switch v := sentEvents[i].Event.(type) {
case *models.FileUploaded:
require.Equal(t, fileID, v.FileID)
default:
t.Errorf("unexpected %#v", v)
}
}
}
11 changes: 7 additions & 4 deletions internal/pipeline/file_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,17 @@ import (
"github.com/moov-io/base"
"github.com/moov-io/base/database"
"github.com/moov-io/base/log"
"gocloud.dev/pubsub"

"github.com/stretchr/testify/require"
"gocloud.dev/pubsub"
)

type TestFileReceiver struct {
*FileReceiver

Publisher stream.Publisher
Events *events.MockEmitter
MergingDir string
Publisher stream.Publisher
Events *events.MockEmitter
}

func (fr *TestFileReceiver) TriggerCutoff(t *testing.T) {
Expand All @@ -72,6 +73,7 @@ func testFileReceiver(t *testing.T) *TestFileReceiver {
ctx := context.Background()
logger := log.NewTestLogger()

dir := t.TempDir()
conf := &service.Config{
Inbound: service.Inbound{
InMem: &service.InMemory{
Expand Down Expand Up @@ -102,7 +104,7 @@ func testFileReceiver(t *testing.T) *TestFileReceiver {
Merging: service.Merging{
Storage: storage.Config{
Filesystem: storage.FilesystemConfig{
Directory: t.TempDir(),
Directory: dir,
},
},
},
Expand All @@ -124,6 +126,7 @@ func testFileReceiver(t *testing.T) *TestFileReceiver {

return &TestFileReceiver{
FileReceiver: fileReceiver,
MergingDir: dir,
Publisher: filesTopic,
Events: eventEmitter,
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func ReadWithOpts(data []byte, opts *ach.ValidateOpts) (*Event, error) {
case "CancelACHFile":
var file CancelACHFile
event.Event = &file

case "FileUploaded":
var file FileUploaded
event.Event = &file
}

err = json.Unmarshal(eventType.Event, event.Event)
Expand Down

0 comments on commit d403362

Please sign in to comment.