Skip to content

Commit

Permalink
feat: AddSpadeTasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason Cihelka committed Sep 6, 2023
1 parent 229d5b9 commit 97e3e6b
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 2 deletions.
4 changes: 2 additions & 2 deletions integration/spadev0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func fetchActiveReplicas(ctx context.Context, url string) (*ActiveReplicas, erro
// < 4Tib = 1 cid
// 4 TiB - 16TiB = 2 cids
// 16 TiB - 256 TiB = 3 cids
// TODO: Revise
// TODO: Revise scaling as necessary

Check failure on line 143 in integration/spadev0/main.go

View workflow job for this annotation

GitHub Actions / build

Comment should end in a period (godot)
func numCidsToTest(size int) int {
return int(math.Max(math.Log2(float64(size/1024)), 1))
}
Expand All @@ -154,7 +154,7 @@ func selectReplicasToTest(perProvider map[int]ProviderReplicas) map[int][]Replic
maxReplicas := len(provider.replicas)
numCidsToTest := numCidsToTest(provider.size)

// TODO: Randomize
// TODO: Randomize which CIDs get selected
for i := 0; i < numCidsToTest && i < maxReplicas; i++ {
toTest[providerID] = append(toTest[providerID], provider.replicas[i])
}
Expand Down
98 changes: 98 additions & 0 deletions integration/spadev0/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package main

import (
"context"
"fmt"
"time"

"github.com/data-preservation-programs/RetrievalBot/integration/filplus/util"
"github.com/data-preservation-programs/RetrievalBot/pkg/env"
"github.com/data-preservation-programs/RetrievalBot/pkg/model"
"github.com/data-preservation-programs/RetrievalBot/pkg/resolver"
"github.com/pkg/errors"
"github.com/rjNemo/underscore"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type GroupID struct {
Provider string `bson:"provider"`
PieceCID string `bson:"piece_cid"`
}
type Row struct {
ID GroupID `bson:"_id"`
Document model.DealState `bson:"document"`
}

func AddSpadeTasks(ctx context.Context, requester string, replicasToTest map[int]ProviderReplicas) error {
// Connect to the database
stateMarketDealsClient, err := mongo.
Connect(ctx, options.Client().ApplyURI(env.GetRequiredString(env.StatemarketdealsMongoURI)))
if err != nil {
panic(err)
}
marketDealsCollection := stateMarketDealsClient.
Database(env.GetRequiredString(env.StatemarketdealsMongoDatabase)).
Collection("state_market_deals")

providerCacheTTL := env.GetDuration(env.ProviderCacheTTL, 24*time.Hour)
locationCacheTTL := env.GetDuration(env.LocationCacheTTL, 24*time.Hour)
locationResolver := resolver.NewLocationResolver(env.GetRequiredString(env.IPInfoToken), locationCacheTTL)
providerResolver, err := resolver.NewProviderResolver(
env.GetString(env.LotusAPIUrl, "https://api.node.glif.io/rpc/v0"),
env.GetString(env.LotusAPIToken, ""),
providerCacheTTL)
if err != nil {
panic(err)
}
// Check public IP address
ipInfo, err := resolver.GetPublicIPInfo(ctx, "", "")
if err != nil {
panic(err)
}
logger.With("ipinfo", ipInfo).Infof("Public IP info retrieved")

// For each SPID, grab the market deals for its CIDs and then add tasks
for spid, replica := range replicasToTest {
// Get the relevant market deals for the given SP and replicas
//nolint:govet
pieceCids := underscore.Map(replica.replicas, func(r Replica) string {
return r.PieceCID
})

result, err := marketDealsCollection.Aggregate(ctx, mongo.Pipeline{
{{"$match", bson.D{

Check failure on line 65 in integration/spadev0/util.go

View workflow job for this annotation

GitHub Actions / build

composites: go.mongodb.org/mongo-driver/bson/primitive.E struct literal uses unkeyed fields (govet)
{"provider", bson.D{{"$in", spid}}},

Check failure on line 66 in integration/spadev0/util.go

View workflow job for this annotation

GitHub Actions / build

composites: go.mongodb.org/mongo-driver/bson/primitive.E struct literal uses unkeyed fields (govet)
{"piece_cid", bson.D{{"$in", pieceCids}}},

Check failure on line 67 in integration/spadev0/util.go

View workflow job for this annotation

GitHub Actions / build

composites: go.mongodb.org/mongo-driver/bson/primitive.E struct literal uses unkeyed fields (govet)
{"expiration", bson.D{{"$gt", time.Now()}}},
}}},
{{"$group", bson.D{
{"_id", bson.D{{"provider", "$provider"}, {"piece_cid", "$piece_cid"}}},
{"document", bson.D{{"$first", "$$ROOT"}}},
}}},
})
if err != nil {
return errors.Wrap(err, "failed to query market deals")
}
var rows []Row
err = result.All(ctx, &rows)
if err != nil {
return errors.Wrap(err, "failed to decode market deals")
}

logger.Infow("Market deals retrieved", "count", len(rows))
documents := underscore.Map(rows, func(row Row) model.DealState {
return row.Document
})

// TODO: retrieve_type
// TODO: bitswap/http
tasks, results := util.AddTasks(ctx, requester, ipInfo, documents, locationResolver, *providerResolver)

fmt.Printf("SPID [%d] Tasks: %+v \n Results: %+v \n", spid, tasks, results)

Check failure on line 93 in integration/spadev0/util.go

View workflow job for this annotation

GitHub Actions / build

use of `fmt.Printf` forbidden by pattern `^(fmt\.Print(|f|ln)|print|println)$` (forbidigo)
// TODO: write the tasks and results to the DB
}

return nil
}

0 comments on commit 97e3e6b

Please sign in to comment.