Skip to content

Commit

Permalink
fixes [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
akurilov committed Jul 13, 2024
1 parent 2d5854c commit 1244634
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 48 deletions.
9 changes: 6 additions & 3 deletions api/http/handler/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type CallbackHandler interface {

type callbackHandler struct {
topicPrefixBase string
host string
svcConv converter.Service
svcAp activitypub.Service
}
Expand All @@ -34,9 +35,10 @@ const keyHubTopic = "hub.topic"
const linkSelfSuffix = ">; rel=\"self\""
const keyAckCount = "X-Ack-Count"

func NewCallbackHandler(topicPrefixBase string, svcConv converter.Service, svcAp activitypub.Service) CallbackHandler {
func NewCallbackHandler(topicPrefixBase, host string, svcConv converter.Service, svcAp activitypub.Service) CallbackHandler {
return callbackHandler{
topicPrefixBase: topicPrefixBase,
host: host,
svcConv: svcConv,
svcAp: svcAp,
}
Expand Down Expand Up @@ -80,6 +82,7 @@ func (ch callbackHandler) Deliver(ctx *gin.Context) {
ctx.String(http.StatusBadRequest, fmt.Sprintf("invalid self link header value in the request: %s", topic))
return
}
pubKeyId := fmt.Sprintf("https://%s/actor/%s#main-key", ch.host, interestId)

followerUrl, err := url.QueryUnescape(ctx.Query(reader.QueryParamFollower))
if err != nil || followerUrl == "" {
Expand All @@ -88,7 +91,7 @@ func (ch callbackHandler) Deliver(ctx *gin.Context) {
}

var follower vocab.Actor
follower, _, err = ch.svcAp.FetchActor(ctx, vocab.IRI(followerUrl))
follower, _, err = ch.svcAp.FetchActor(ctx, vocab.IRI(followerUrl), pubKeyId)
if err != nil {
ctx.String(http.StatusInternalServerError, fmt.Sprintf("failed to resolve the follower %s: %s", follower, err))
return
Expand Down Expand Up @@ -120,7 +123,7 @@ func (ch callbackHandler) Deliver(ctx *gin.Context) {
a, err = ch.svcConv.ConvertEventToActivity(ctx, evtProto, interestId, &follower)
}
if err == nil {
err = ch.svcAp.SendActivity(ctx, a, follower.Inbox.GetLink())
err = ch.svcAp.SendActivity(ctx, a, follower.Inbox.GetLink(), pubKeyId)
}
if err != nil {
break
Expand Down
18 changes: 14 additions & 4 deletions api/http/handler/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ import (
type inboxHandler struct {
svcActivityPub activitypub.Service
svc service.Service
host string
}

const limitReqBodyLen = 262_144

func NewInboxHandler(svcActivityPub activitypub.Service, svc service.Service) Handler {
func NewInboxHandler(svcActivityPub activitypub.Service, svc service.Service, host string) Handler {
return inboxHandler{
svcActivityPub: svcActivityPub,
svc: svc,
host: host,
}
}

Expand Down Expand Up @@ -65,9 +67,18 @@ func (h inboxHandler) Handle(ctx *gin.Context) {
return
}

actorIdLocal := ctx.Param("id")
var pubKeyId string
switch actorIdLocal {
case "":
pubKeyId = fmt.Sprintf("https://%s/actor#main-key", h.host)
default:
pubKeyId = fmt.Sprintf("https://%s/actor/%s#main-key", h.host, actorIdLocal)
}

var actor vocab.Actor
var actorTags util.ObjectTags
actor, actorTags, err = h.svcActivityPub.FetchActor(ctx, activity.Actor.GetLink())
actor, actorTags, err = h.svcActivityPub.FetchActor(ctx, activity.Actor.GetLink(), pubKeyId)
if err != nil {
ctx.String(http.StatusInternalServerError, err.Error())
return
Expand All @@ -80,9 +91,8 @@ func (h inboxHandler) Handle(ctx *gin.Context) {
return
}

actorIdLocal := ctx.Param("id")
var post func()
post, err = h.svc.HandleActivity(ctx, actorIdLocal, actor, actorTags, activity, tags)
post, err = h.svc.HandleActivity(ctx, actorIdLocal, pubKeyId, actor, actorTags, activity, tags)
switch {
case errors.Is(err, reader.ErrConflict):
ctx.String(http.StatusConflict, err.Error())
Expand Down
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func main() {
ha := handler.NewActorHandler(actor, actorExtraAttrs, clientAwk, "https://awakari.com/sub-details.html?id=", cfg.Api)

// WebFinger
wf := apiHttp.WebFinger{
wfDefault := apiHttp.WebFinger{
Subject: fmt.Sprintf("acct:%s@%s", cfg.Api.Actor.Name, cfg.Api.Http.Host),
Links: []apiHttp.WebFingerLink{
{
Expand All @@ -219,10 +219,10 @@ func main() {
},
},
}
hwf := handler.NewWebFingerHandler(wf, cfg.Api.Http.Host, clientAwk)
hwf := handler.NewWebFingerHandler(wfDefault, cfg.Api.Http.Host, clientAwk)

// handlers for inbox, outbox, following, followers
hi := handler.NewInboxHandler(svcActivityPub, svc)
hi := handler.NewInboxHandler(svcActivityPub, svc, cfg.Api.Http.Host)
ho := handler.NewOutboxHandler(svcReader, svcConv, fmt.Sprintf("https://%s/outbox", cfg.Api.Http.Host))
hoDummy := handler.NewDummyCollectionHandler(vocab.OrderedCollectionPage{
ID: vocab.IRI(fmt.Sprintf("https://%s/outbox", cfg.Api.Http.Host)),
Expand Down Expand Up @@ -266,7 +266,7 @@ func main() {
}
}()

hc := handler.NewCallbackHandler(cfg.Api.Reader.Uri, svcConv, svcActivityPub)
hc := handler.NewCallbackHandler(cfg.Api.Reader.Uri, cfg.Api.Http.Host, svcConv, svcActivityPub)

log.Info(fmt.Sprintf("starting to listen the HTTP API @ port #%d...", cfg.Api.Reader.CallBack.Port))
internalCallbacks := gin.Default()
Expand Down
12 changes: 6 additions & 6 deletions service/activitypub/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ func (l logging) ResolveActorLink(ctx context.Context, host, name string) (self
return
}

func (l logging) FetchActor(ctx context.Context, addr vocab.IRI) (actor vocab.Actor, tags util.ObjectTags, err error) {
actor, tags, err = l.svc.FetchActor(ctx, addr)
l.log.Log(ctx, logLevel(err), fmt.Sprintf("activitypub.FetchActor(addr=%s): {Id;%+v, Inbox:%+v, Tags:%+v}, %s", addr, actor.ID, actor.Inbox, tags, err))
func (l logging) FetchActor(ctx context.Context, addr vocab.IRI, pubKeyId string) (actor vocab.Actor, tags util.ObjectTags, err error) {
actor, tags, err = l.svc.FetchActor(ctx, addr, pubKeyId)
l.log.Log(ctx, logLevel(err), fmt.Sprintf("activitypub.FetchActor(addr=%s, pubKeyId=%s): {Id;%+v, Inbox:%+v, Tags:%+v}, %s", addr, pubKeyId, actor.ID, actor.Inbox, tags, err))
return
}

func (l logging) SendActivity(ctx context.Context, a vocab.Activity, inbox vocab.IRI) (err error) {
err = l.svc.SendActivity(ctx, a, inbox)
l.log.Log(ctx, logLevel(err), fmt.Sprintf("activitypub.SendActivity(a=%+v, inbox=%s): %s", a, inbox, err))
func (l logging) SendActivity(ctx context.Context, a vocab.Activity, inbox vocab.IRI, pubKeyId string) (err error) {
err = l.svc.SendActivity(ctx, a, inbox, pubKeyId)
l.log.Log(ctx, logLevel(err), fmt.Sprintf("activitypub.SendActivity(a=%v, inbox=%s, pubKeyId=%s): %s", a, inbox, pubKeyId, err))
return
}

Expand Down
4 changes: 2 additions & 2 deletions service/activitypub/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (m mock) ResolveActorLink(ctx context.Context, host, name string) (self voc
return
}

func (m mock) FetchActor(ctx context.Context, self vocab.IRI) (a vocab.Actor, tags util.ObjectTags, err error) {
func (m mock) FetchActor(ctx context.Context, self vocab.IRI, pubKeyId string) (a vocab.Actor, tags util.ObjectTags, err error) {
switch self {
case "https://fail.social/users/johndoe":
err = ErrActorFetch
Expand Down Expand Up @@ -53,7 +53,7 @@ func (m mock) FetchActor(ctx context.Context, self vocab.IRI) (a vocab.Actor, ta
return
}

func (m mock) SendActivity(ctx context.Context, a vocab.Activity, inbox vocab.IRI) (err error) {
func (m mock) SendActivity(ctx context.Context, a vocab.Activity, inbox vocab.IRI, pubKeyId string) (err error) {
switch inbox {
case "https://host.fail/users/johndoe/inbox":
err = ErrActivitySend
Expand Down
17 changes: 9 additions & 8 deletions service/activitypub/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (

type Service interface {
ResolveActorLink(ctx context.Context, host, name string) (self vocab.IRI, err error)
FetchActor(ctx context.Context, addr vocab.IRI) (a vocab.Actor, tags util.ObjectTags, err error)
SendActivity(ctx context.Context, a vocab.Activity, inbox vocab.IRI) (err error)
FetchActor(ctx context.Context, addr vocab.IRI, pubKeyId string) (a vocab.Actor, tags util.ObjectTags, err error)
SendActivity(ctx context.Context, a vocab.Activity, inbox vocab.IRI, pubKeyId string) (err error)
nodeinfo.Resolver
}

Expand Down Expand Up @@ -94,7 +94,7 @@ func (svc service) ResolveActorLink(ctx context.Context, host, name string) (sel
return
}

func (svc service) FetchActor(ctx context.Context, addr vocab.IRI) (actor vocab.Actor, tags util.ObjectTags, err error) {
func (svc service) FetchActor(ctx context.Context, addr vocab.IRI, pubKeyId string) (actor vocab.Actor, tags util.ObjectTags, err error) {
//
var req *http.Request
req, err = http.NewRequestWithContext(ctx, http.MethodGet, string(addr), nil)
Expand All @@ -112,7 +112,7 @@ func (svc service) FetchActor(ctx context.Context, addr vocab.IRI) (actor vocab.
req.Header.Set("Date", now.Format(http.TimeFormat))
}
//
err = svc.signRequest(req, []byte{})
err = svc.signRequest(req, []byte{}, pubKeyId)
//
if err == nil {
resp, err = svc.clientHttp.Do(req)
Expand All @@ -137,7 +137,7 @@ func (svc service) FetchActor(ctx context.Context, addr vocab.IRI) (actor vocab.
return
}

func (svc service) SendActivity(ctx context.Context, a vocab.Activity, inbox vocab.IRI) (err error) {
func (svc service) SendActivity(ctx context.Context, a vocab.Activity, inbox vocab.IRI, pubKeyId string) (err error) {
//
aFixed, _ := apiHttp.FixContext(a)
var d []byte
Expand All @@ -160,7 +160,7 @@ func (svc service) SendActivity(ctx context.Context, a vocab.Activity, inbox voc
req.Header.Set("Date", now.Format(http.TimeFormat))
}
//
err = svc.signRequest(req, d)
err = svc.signRequest(req, d, pubKeyId)
//
var resp *http.Response
if err == nil {
Expand All @@ -182,7 +182,7 @@ func (svc service) SendActivity(ctx context.Context, a vocab.Activity, inbox voc
return
}

func (svc service) signRequest(req *http.Request, data []byte) (err error) {
func (svc service) signRequest(req *http.Request, data []byte, pubKeyId string) (err error) {
var signer httpsig.Signer
signer, _, err = httpsig.NewSigner(prefs, digestAlgorithm, headersToSign, httpsig.Signature, 120)
var privKey any
Expand All @@ -193,7 +193,8 @@ func (svc service) signRequest(req *http.Request, data []byte) (err error) {
}
}
if err == nil {
err = signer.SignRequest(privKey, fmt.Sprintf("https://%s/actor#main-key", svc.hostname), req, data)
//err = signer.SignRequest(privKey, fmt.Sprintf("https://%s/actor#main-key", svc.hostname), req, data)
err = signer.SignRequest(privKey, pubKeyId, req, data)
if err != nil {
err = fmt.Errorf("failed to sign the follow request: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion service/activitypub/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestService_FetchActor(t *testing.T) {
t.Skip()
}
svc := NewService(http.DefaultClient, "activitypub.awakari.com", []byte{}, nil)
actor, _, err := svc.FetchActor(context.TODO(), "https://mastodon.social/users/akurilov")
actor, _, err := svc.FetchActor(context.TODO(), "https://mastodon.social/users/akurilov", "https://activitypub.awakari.com/actor#main-key")
assert.Equal(t, "https://mastodon.social/users/akurilov/inbox", actor.Inbox.GetLink().String())
assert.Nil(t, err)
}
Expand All @@ -45,6 +45,7 @@ func TestService_RequestFollow(t *testing.T) {
Object: vocab.IRI("https://mastodon.social/users/akurilov"),
},
"https://mastodon.social/users/akurilov/inbox",
"foo.bar#main.key",
)
assert.Nil(t, err)
}
4 changes: 2 additions & 2 deletions service/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func (l logging) RequestFollow(ctx context.Context, addr, groupId, userId, subId
return
}

func (l logging) HandleActivity(ctx context.Context, actorIdLocal string, actor vocab.Actor, actorTags util.ObjectTags, activity vocab.Activity, activityTags util.ActivityTags) (post func(), err error) {
post, err = l.svc.HandleActivity(ctx, actorIdLocal, actor, actorTags, activity, activityTags)
func (l logging) HandleActivity(ctx context.Context, actorIdLocal, pubKeyId string, actor vocab.Actor, actorTags util.ObjectTags, activity vocab.Activity, activityTags util.ActivityTags) (post func(), err error) {
post, err = l.svc.HandleActivity(ctx, actorIdLocal, pubKeyId, actor, actorTags, activity, activityTags)
l.log.Log(ctx, logLevel(err), fmt.Sprintf("service.HandleActivity(actorIdLocal=%s, actor.Id=%s, actor.Tags=%d, activity.Type=%s, activity.Tags=%d): err=%s", actorIdLocal, actor.ID, len(actorTags.Tag), activity.Type, len(activityTags.Tag), err))
return
}
Expand Down
2 changes: 1 addition & 1 deletion service/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (m mock) RequestFollow(ctx context.Context, addr, groupId, userId, subId, t
return
}

func (m mock) HandleActivity(ctx context.Context, actorIdLocal string, actor vocab.Actor, actorTags util.ObjectTags, activity vocab.Activity, tags util.ActivityTags) (post func(), err error) {
func (m mock) HandleActivity(ctx context.Context, actorIdLocal, pubKeyId string, actor vocab.Actor, actorTags util.ObjectTags, activity vocab.Activity, tags util.ActivityTags) (post func(), err error) {
switch actor.ID {
case "fail":
err = storage.ErrInternal
Expand Down
34 changes: 18 additions & 16 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Service interface {

HandleActivity(
ctx context.Context,
actorIdLocal string,
actorIdLocal, pubKeyId string,
actor vocab.Actor,
actorTags util.ObjectTags,
activity vocab.Activity,
Expand Down Expand Up @@ -111,10 +111,12 @@ func (svc service) RequestFollow(ctx context.Context, addr, groupId, userId, sub
}
}

pubKeyId := fmt.Sprintf("https://%s/actor#main-key", svc.hostSelf)

var actor vocab.Actor
var actorTags util.ObjectTags
if err == nil {
actor, actorTags, err = svc.ap.FetchActor(ctx, vocab.IRI(addrResolved))
actor, actorTags, err = svc.ap.FetchActor(ctx, vocab.IRI(addrResolved), pubKeyId)
if err != nil {
err = fmt.Errorf("%w: failed to fetch actor: %s, cause: %s", ErrInvalid, addrResolved, err)
}
Expand Down Expand Up @@ -148,7 +150,7 @@ func (svc service) RequestFollow(ctx context.Context, addr, groupId, userId, sub
Actor: vocab.IRI(fmt.Sprintf("https://%s/actor", svc.hostSelf)),
Object: vocab.IRI(addrResolved),
}
err = svc.ap.SendActivity(ctx, activity, actor.Inbox.GetLink())
err = svc.ap.SendActivity(ctx, activity, actor.Inbox.GetLink(), pubKeyId)
if err != nil {
src.Err = err.Error()
_ = svc.stor.Update(ctx, src)
Expand All @@ -160,7 +162,7 @@ func (svc service) RequestFollow(ctx context.Context, addr, groupId, userId, sub

func (svc service) HandleActivity(
ctx context.Context,
actorIdLocal string,
actorIdLocal, pubKeyId string,
actor vocab.Actor,
actorTags util.ObjectTags,
activity vocab.Activity,
Expand All @@ -172,32 +174,32 @@ func (svc service) HandleActivity(
actorId := actor.ID.String()
switch activity.Type {
case vocab.FollowType:
post, err = svc.handleFollowActivity(ctx, actorIdLocal, actorId, activity)
post, err = svc.handleFollowActivity(ctx, actorIdLocal, pubKeyId, actorId, activity)
case vocab.UndoType:
err = svc.handleUndoActivity(ctx, actorIdLocal, actorId, activity)
default:
err = svc.handleSourceActivity(ctx, actorId, actor, actorTags, activity, activityTags)
err = svc.handleSourceActivity(ctx, actorId, pubKeyId, actor, actorTags, activity, activityTags)
}
return
}

func (svc service) handleFollowActivity(ctx context.Context, actorIdLocal, actorId string, activity vocab.Activity) (post func(), err error) {
func (svc service) handleFollowActivity(ctx context.Context, actorIdLocal, pubKeyId, actorId string, activity vocab.Activity) (post func(), err error) {
d, _ := json.Marshal(activity)
fmt.Printf("Follow activity payload: %s\n", d)
cbUrl := svc.makeCallbackUrl(actorId)
err = svc.r.CreateCallback(ctx, actorIdLocal, cbUrl)
var actor vocab.Actor
if err == nil {
actor, _, err = svc.ap.FetchActor(ctx, vocab.IRI(actorId))
actor, _, err = svc.ap.FetchActor(ctx, vocab.IRI(actorId), pubKeyId)
}
if err == nil {
post = func() {
time.Sleep(1 * time.Minute) // TODO tmp test code
time.Sleep(10 * time.Second)
accept := vocab.AcceptNew(vocab.IRI(fmt.Sprintf("https://%s/%s", svc.hostSelf, uuid.NewString())), activity.Object)
accept.Context = vocab.IRI(model.NsAs)
accept.Actor = vocab.ID(fmt.Sprintf("https://%s/actor/%s", svc.hostSelf, actorIdLocal))
accept.Object = activity
_ = svc.ap.SendActivity(ctx, *accept, actor.Inbox.GetLink())
_ = svc.ap.SendActivity(ctx, *accept, actor.Inbox.GetLink(), pubKeyId)
}
}
return
Expand All @@ -219,7 +221,7 @@ func (svc service) makeCallbackUrl(actorId string) (cbUrl string) {

func (svc service) handleSourceActivity(
ctx context.Context,
srcId string,
srcId, pubKeyId string,
actor vocab.Actor,
actorTags util.ObjectTags,
activity vocab.Activity,
Expand Down Expand Up @@ -258,7 +260,7 @@ func (svc service) handleSourceActivity(
err = fmt.Errorf("%w: actor=%+v, activity.Type=%s", ErrNoAccept, actor, activity.Type)
}
case errors.Is(err, storage.ErrNotFound):
err = svc.unfollow(ctx, actor.ID)
err = svc.unfollow(ctx, actor.ID, pubKeyId)
}
return
}
Expand All @@ -274,16 +276,16 @@ func (svc service) List(ctx context.Context, filter model.Filter, limit uint32,
}

func (svc service) Unfollow(ctx context.Context, url vocab.IRI, groupId, userId string) (err error) {
err = svc.unfollow(ctx, url)
err = svc.unfollow(ctx, url, fmt.Sprintf("https://%s/actor#main-key", svc.hostSelf))
if err == nil {
err = svc.stor.Delete(ctx, url.String(), groupId, userId)
}
return
}

func (svc service) unfollow(ctx context.Context, url vocab.IRI) (err error) {
func (svc service) unfollow(ctx context.Context, url vocab.IRI, pubKeyId string) (err error) {
var actor vocab.Actor
actor, _, err = svc.ap.FetchActor(ctx, url)
actor, _, err = svc.ap.FetchActor(ctx, url, pubKeyId)
if err != nil {
err = fmt.Errorf("%w: failed to fetch actor: %s, cause: %s", ErrInvalid, url, err)
}
Expand All @@ -299,7 +301,7 @@ func (svc service) unfollow(ctx context.Context, url vocab.IRI) (err error) {
Object: url,
},
}
err = svc.ap.SendActivity(ctx, activity, actor.Inbox.GetLink())
err = svc.ap.SendActivity(ctx, activity, actor.Inbox.GetLink(), pubKeyId)
}
return
}
Loading

0 comments on commit 1244634

Please sign in to comment.