From d403362e5e61b1c9c56ac07f69796ac793edaec9 Mon Sep 17 00:00:00 2001 From: Adam Shannon Date: Fri, 9 Jun 2023 13:09:58 -0500 Subject: [PATCH] test: verify FileUploaded is sent from events api --- internal/pipeline/events_api_test.go | 126 ++++++++++++++++++++++++ internal/pipeline/file_receiver_test.go | 11 ++- pkg/models/events.go | 4 + 3 files changed, 137 insertions(+), 4 deletions(-) create mode 100644 internal/pipeline/events_api_test.go diff --git a/internal/pipeline/events_api_test.go b/internal/pipeline/events_api_test.go new file mode 100644 index 0000000..4b2d9d2 --- /dev/null +++ b/internal/pipeline/events_api_test.go @@ -0,0 +1,126 @@ +// Licensed to The Moov Authors under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. The Moov Authors licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import ( + "context" + "fmt" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + "github.com/moov-io/ach" + "github.com/moov-io/achgateway/internal/events" + "github.com/moov-io/achgateway/pkg/compliance" + "github.com/moov-io/achgateway/pkg/models" + "github.com/moov-io/base" + "github.com/moov-io/base/admin" + "gocloud.dev/pubsub" + + "github.com/stretchr/testify/require" +) + +func TestEventsAPI_FileUploaded(t *testing.T) { + adminServer := admin.NewServer(":0") + require.NotNil(t, adminServer) + go adminServer.Listen() + defer adminServer.Shutdown() + + fr := testFileReceiver(t) + fr.RegisterAdminRoutes(adminServer) + + // Write a file that's produced + agg, exists := fr.shardAggregators["testing"] + require.True(t, exists) + require.NotNil(t, agg) + + m, ok := agg.merger.(*filesystemMerging) + require.True(t, ok) + require.NotNil(t, m) + + file, err := ach.ReadFile(filepath.Join("..", "..", "testdata", "ppd-debit.ach")) + require.NoError(t, err) + + fileID := base.ID() + bs, err := compliance.Protect(nil, models.Event{ + Event: models.QueueACHFile{ + FileID: fileID, + ShardKey: "testing", + File: file, + }, + }) + require.NoError(t, err) + + err = fr.Publisher.Send(context.Background(), &pubsub.Message{ + Body: bs, + }) + require.NoError(t, err) + + // Verify the file is pending on disk + require.Eventually(t, func() bool { + found, _ := os.ReadDir(filepath.Join(fr.MergingDir, "mergable", "testing")) + return len(found) > 0 + }, 5*time.Second, 100*time.Millisecond) + + // Isolate the directory (on upload) + fr.TriggerCutoff(t) + + var found []os.DirEntry + require.Eventually(t, func() bool { + found, err = os.ReadDir(fr.MergingDir) + require.NoError(t, err) + return len(found) > 1 + }, 5*time.Second, 100*time.Millisecond) + + var uploadDir string + for i := range found { + if found[i].Name() != "mergable" { + uploadDir = found[i].Name() + } + } + require.NotEmpty(t, uploadDir) + + // Reproduce FileUploaded event + address := fmt.Sprintf("http://%s/shards/testing/pipeline/%s/file-uploaded", adminServer.BindAddr(), uploadDir) + req, err := http.NewRequest("PUT", address, nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + + // Verify that FileUploaded was produced + emitter, ok := agg.eventEmitter.(*events.MockEmitter) + require.True(t, ok) + require.Eventually(t, func() bool { + return len(emitter.Sent()) == 2 + }, 5*time.Second, 100*time.Millisecond) + + // Check fields of the FileUploaded events + sentEvents := emitter.Sent() + for i := range sentEvents { + switch v := sentEvents[i].Event.(type) { + case *models.FileUploaded: + require.Equal(t, fileID, v.FileID) + default: + t.Errorf("unexpected %#v", v) + } + } +} diff --git a/internal/pipeline/file_receiver_test.go b/internal/pipeline/file_receiver_test.go index 8fad672..7227774 100644 --- a/internal/pipeline/file_receiver_test.go +++ b/internal/pipeline/file_receiver_test.go @@ -37,16 +37,17 @@ import ( "github.com/moov-io/base" "github.com/moov-io/base/database" "github.com/moov-io/base/log" - "gocloud.dev/pubsub" "github.com/stretchr/testify/require" + "gocloud.dev/pubsub" ) type TestFileReceiver struct { *FileReceiver - Publisher stream.Publisher - Events *events.MockEmitter + MergingDir string + Publisher stream.Publisher + Events *events.MockEmitter } func (fr *TestFileReceiver) TriggerCutoff(t *testing.T) { @@ -72,6 +73,7 @@ func testFileReceiver(t *testing.T) *TestFileReceiver { ctx := context.Background() logger := log.NewTestLogger() + dir := t.TempDir() conf := &service.Config{ Inbound: service.Inbound{ InMem: &service.InMemory{ @@ -102,7 +104,7 @@ func testFileReceiver(t *testing.T) *TestFileReceiver { Merging: service.Merging{ Storage: storage.Config{ Filesystem: storage.FilesystemConfig{ - Directory: t.TempDir(), + Directory: dir, }, }, }, @@ -124,6 +126,7 @@ func testFileReceiver(t *testing.T) *TestFileReceiver { return &TestFileReceiver{ FileReceiver: fileReceiver, + MergingDir: dir, Publisher: filesTopic, Events: eventEmitter, } diff --git a/pkg/models/events.go b/pkg/models/events.go index c6d131a..048a6d5 100644 --- a/pkg/models/events.go +++ b/pkg/models/events.go @@ -110,6 +110,10 @@ func ReadWithOpts(data []byte, opts *ach.ValidateOpts) (*Event, error) { case "CancelACHFile": var file CancelACHFile event.Event = &file + + case "FileUploaded": + var file FileUploaded + event.Event = &file } err = json.Unmarshal(eventType.Event, event.Event)