diff --git a/integration/filplus/util/util.go b/integration/filplus/util/util.go index f710781..25fba0b 100644 --- a/integration/filplus/util/util.go +++ b/integration/filplus/util/util.go @@ -2,6 +2,9 @@ package util import ( "context" + "strconv" + "time" + "github.com/data-preservation-programs/RetrievalBot/pkg/convert" "github.com/data-preservation-programs/RetrievalBot/pkg/env" "github.com/data-preservation-programs/RetrievalBot/pkg/model" @@ -13,8 +16,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "golang.org/x/exp/slices" - "strconv" - "time" ) var logger = logging.Logger("addTasks") diff --git a/integration/spadev0/util.go b/integration/spadev0/util.go index bba9345..e127455 100644 --- a/integration/spadev0/util.go +++ b/integration/spadev0/util.go @@ -2,13 +2,15 @@ package main import ( "context" - "fmt" + "strconv" "time" - "github.com/data-preservation-programs/RetrievalBot/integration/filplus/util" + "github.com/data-preservation-programs/RetrievalBot/pkg/convert" "github.com/data-preservation-programs/RetrievalBot/pkg/env" "github.com/data-preservation-programs/RetrievalBot/pkg/model" + "github.com/data-preservation-programs/RetrievalBot/pkg/requesterror" "github.com/data-preservation-programs/RetrievalBot/pkg/resolver" + "github.com/data-preservation-programs/RetrievalBot/pkg/task" "github.com/pkg/errors" "github.com/rjNemo/underscore" "go.mongodb.org/mongo-driver/bson" @@ -86,13 +88,70 @@ func AddSpadeTasks(ctx context.Context, requester string, replicasToTest map[int return row.Document }) - // TODO: retrieve_type - // TODO: bitswap/http - tasks, results := util.AddTasks(ctx, requester, ipInfo, documents, locationResolver, *providerResolver) - - fmt.Printf("SPID [%d] Tasks: %+v \n Results: %+v \n", spid, tasks, results) - // TODO: write the tasks and results to the DB + prepareSpadeTasks(ctx, requester, ipInfo, documents, locationResolver, *providerResolver) } + // TODO: Write the tasks out to database return nil } + +func prepareSpadeTasks( + ctx context.Context, + requester string, + ipInfo resolver.IPInfo, + documents []model.DealState, + locationResolver resolver.LocationResolver, + providerResolver resolver.ProviderResolver, +) (tasks []interface{}, results []interface{}) { + for _, document := range documents { + providerInfo, err := providerResolver.ResolveProvider(ctx, document.Provider) + if err != nil { + logger.With("provider", document.Provider, "deal_id", document.DealID). + Error("failed to resolve provider") + continue + } + + location, err := locationResolver.ResolveMultiaddrsBytes(ctx, providerInfo.Multiaddrs) + if err != nil { + if errors.As(err, &requesterror.BogonIPError{}) || + errors.As(err, &requesterror.InvalidIPError{}) || + errors.As(err, &requesterror.HostLookupError{}) || + errors.As(err, &requesterror.NoValidMultiAddrError{}) { + + // TODO: addErrorResults + // results = addErrorResults(requester, ipInfo, results, document, providerInfo, location, + // task.NoValidMultiAddrs, err.Error()) + } else { + logger.With("provider", document.Provider, "deal_id", document.DealID, "err", err). + Error("failed to resolve provider location") + } + continue + } + + tasks = append(tasks, task.Task{ + Requester: requester, + Module: task.HTTP, // TODO: Bitswap + Metadata: map[string]string{ + "deal_id": strconv.Itoa(int(document.DealID)), + "client": document.Client, + "retrieve_type": "spade", + "retrieve_size": "1048576"}, + Provider: task.Provider{ + ID: document.Provider, + PeerID: providerInfo.PeerId, + Multiaddrs: convert.MultiaddrsBytesToStringArraySkippingError(providerInfo.Multiaddrs), + City: location.City, + Region: location.Region, + Country: location.Country, + Continent: location.Continent, + }, + Content: task.Content{ + CID: document.PieceCID, + }, + CreatedAt: time.Now().UTC(), + Timeout: env.GetDuration(env.FilplusIntegrationTaskTimeout, 15*time.Second), + }) + } + + return tasks, results +}