Skip to content

Commit

Permalink
Corrected Bugs (#212)
Browse files Browse the repository at this point in the history
1- Added secretsPath as a cli paramter instead of fixed to /internal
2- increased allowed pings to 3 times max ping to allow a peer retry pinging another one  3 times just in case
3- if vote ends in error that AlreadyVoted, change the peer status in peerList to Unknown
4- maxPingTime=100 instead of 20
5- Corrected the usage of bl.fetchCheckTicker
6- Corrected PoolJoin for mobile and separated HandlePoolJoin for blox
  • Loading branch information
ehsan6sha committed Jan 22, 2024
1 parent 6e4e575 commit 7457446
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 85 deletions.
265 changes: 204 additions & 61 deletions blockchain/bl_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,93 @@ func (bl *FxBlockchain) PoolCreate(ctx context.Context, to peer.ID, r PoolCreate
}
}

func (bl *FxBlockchain) HandlePoolJoin(method string, action string, from peer.ID, w http.ResponseWriter, r *http.Request) {
// This handles the join request sent from client for a blox which is not part of the pool yet
log := log.With("action", action, "from", from)
var req PoolJoinRequest
var res PoolJoinResponse

defer r.Body.Close()

if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
log.Debug("cannot parse request body: %v", err)
http.Error(w, "", http.StatusBadRequest)
return
}

//TODO: Ensure it is optimized for long-running calls
ctx, cancel := context.WithTimeout(r.Context(), time.Second*time.Duration(bl.timeout))
defer cancel()
response, statusCode, err := bl.callBlockchain(ctx, method, action, &req)
if err != nil {
poolID := req.PoolID
poolIDStr := strconv.Itoa(poolID)
requestSubmitted, err := bl.checkIfUserHasOpenPoolRequests(ctx, poolIDStr)
if err == nil && requestSubmitted {
errUpdateConfig := bl.updatePoolName(poolIDStr)
errPingServer := bl.StartPingServer(ctx)
if errUpdateConfig != nil && errPingServer != nil {
errMsg := map[string]interface{}{
"message": "Pool Join is submitted but Error in Starting Ping Server and updating Config",
"description": fmt.Sprintf("Error in Ping server: %s , Error in updateConfig: %s", errPingServer.Error(), errUpdateConfig.Error()),
}
w.WriteHeader(http.StatusExpectationFailed)
json.NewEncoder(w).Encode(errMsg)
return
} else if errUpdateConfig != nil {
errMsg := map[string]interface{}{
"message": "Pool Join is submitted but Error in updating Config",
"description": fmt.Sprintf("Error in updateConfig: %s", errUpdateConfig.Error()),
}
w.WriteHeader(http.StatusExpectationFailed)
json.NewEncoder(w).Encode(errMsg)
return
} else if errPingServer != nil {
errMsg := map[string]interface{}{
"message": "Pool Join is submitted but Error in Ping Server Start",
"description": fmt.Sprintf("Error in PingServer: %s", errPingServer.Error()),
}
w.WriteHeader(http.StatusExpectationFailed)
json.NewEncoder(w).Encode(errMsg)
return
}
statusCode = http.StatusAccepted
err = nil
response = []byte(fmt.Sprintf("{\"account\":\"\",\"pool_id\":%d}", req.PoolID))
}
}
if statusCode == http.StatusOK {
statusCode = http.StatusAccepted
}
log.Debugw("callblockchain response in JoinPool", "statusCode", statusCode, "response", response, "err", err)
// If status code is not 200, attempt to format the response as JSON
if statusCode != http.StatusAccepted || err != nil {
w.WriteHeader(statusCode)
// Try to parse the error and format it as JSON
var errMsg map[string]interface{}
if jsonErr := json.Unmarshal(response, &errMsg); jsonErr != nil {
// If the response isn't JSON or can't be parsed, use a generic message
errMsg = map[string]interface{}{
"message": "An error occurred",
"description": err.Error(),
}
}
json.NewEncoder(w).Encode(errMsg)
return
}
poolIDStr := strconv.Itoa(req.PoolID)
bl.processSuccessfulPoolJoinRequest(ctx, poolIDStr, bl.h.ID())
w.WriteHeader(statusCode)
err1 := json.Unmarshal(response, &res)
if err1 != nil {
log.Error("failed to format response: %v", err1)
}

if err := json.NewEncoder(w).Encode(res); err != nil {
log.Error("failed to write response: %v", err)
}
}

func (bl *FxBlockchain) PoolJoin(ctx context.Context, to peer.ID, r PoolJoinRequest) ([]byte, error) {

if bl.allowTransientConnection {
Expand All @@ -76,38 +163,8 @@ func (bl *FxBlockchain) PoolJoin(ctx context.Context, to peer.ID, r PoolJoinRequ
b, err := io.ReadAll(resp.Body)
switch {
case err != nil:
poolID := r.PoolID
poolIDStr := strconv.Itoa(poolID)
requestSubmitted, err := bl.checkIfUserHasOpenPoolRequests(ctx, poolIDStr)
if err == nil && requestSubmitted {
err := bl.updatePoolName(poolIDStr)
if err != nil {
return []byte("{}"), err
}
err = bl.StartPingServer(ctx)
if err != nil {
return []byte("{}"), err
}
bl.processSuccessfulPoolJoinRequest(ctx, poolIDStr, to)
return []byte("{}"), nil
}
return nil, err
case resp.StatusCode != http.StatusAccepted:
poolID := r.PoolID
poolIDStr := strconv.Itoa(poolID)
requestSubmitted, err := bl.checkIfUserHasOpenPoolRequests(ctx, poolIDStr)
if err == nil && requestSubmitted {
err := bl.updatePoolName(poolIDStr)
if err != nil {
return []byte("{}"), err
}
err = bl.StartPingServer(ctx)
if err != nil {
return []byte("{}"), err
}
bl.processSuccessfulPoolJoinRequest(ctx, poolIDStr, to)
return []byte("{}"), nil
}
// Attempt to parse the body as JSON.
if jsonErr := json.Unmarshal(b, &apiError); jsonErr != nil {
// If we can't parse the JSON, return the original body in the error.
Expand All @@ -116,17 +173,6 @@ func (bl *FxBlockchain) PoolJoin(ctx context.Context, to peer.ID, r PoolJoinRequ
// Return the parsed error message and description.
return nil, fmt.Errorf("unexpected response: %d %s - %s", resp.StatusCode, apiError.Message, apiError.Description)
default:
poolID := r.PoolID
poolIDStr := strconv.Itoa(poolID)
err := bl.updatePoolName(poolIDStr)
if err != nil {
return b, err
}
err = bl.StartPingServer(ctx)
if err != nil {
return b, err
}
bl.processSuccessfulPoolJoinRequest(ctx, poolIDStr, to)
return b, nil
}
}
Expand All @@ -138,7 +184,7 @@ func (bl *FxBlockchain) processSuccessfulPoolJoinRequest(ctx context.Context, po
log.Errorw("Error fetching and populating users", "err", err)
}
bl.stopFetchUsersAfterJoinChan = make(chan struct{})
ticker := time.NewTicker(bl.fetchInterval * time.Minute)
ticker := time.NewTicker(bl.fetchInterval)
defer ticker.Stop()
if bl.wg != nil {
log.Debug("called wg.Add in PoolJoin ticker")
Expand Down Expand Up @@ -174,20 +220,24 @@ func (bl *FxBlockchain) processSuccessfulPoolJoinRequest(ctx context.Context, po
}
}
}()
if bl.a != nil {
if bl.wg != nil {
log.Debug("called wg.Add in PoolJoin ticker2")
bl.wg.Add(1)
}
go func() {
// TODO: THIS METHOD BELOW NEEDS TO RE_INITIALIZE ANNONCEMENTS WITH NEW TOPIC ND START IT FIRST
/*
if bl.a != nil {
if bl.wg != nil {
log.Debug("Called wg.Done in PoolJoin ticker2")
defer bl.wg.Done() // Decrement the counter when the goroutine completes
log.Debug("called wg.Add in PoolJoin ticker2")
bl.wg.Add(1)
}
defer log.Debug("PoolJoin ticker2 go routine is ending")
bl.a.AnnounceJoinPoolRequestPeriodically(ctx)
}()
}
go func() {
if bl.wg != nil {
log.Debug("Called wg.Done in PoolJoin ticker2")
defer bl.wg.Done() // Decrement the counter when the goroutine completes
}
defer log.Debug("PoolJoin ticker2 go routine is ending")
bl.a.AnnounceJoinPoolRequestPeriodically(ctx)
}()
}
*/
}

func (bl *FxBlockchain) StartPingServer(ctx context.Context) error {
Expand Down Expand Up @@ -229,10 +279,8 @@ func (bl *FxBlockchain) PoolCancelJoin(ctx context.Context, to peer.ID, r PoolCa
b, err := io.ReadAll(resp.Body)
switch {
case err != nil:
bl.cleanLeaveJoinPool(ctx, r.PoolID)
return nil, err
case resp.StatusCode != http.StatusAccepted:
bl.cleanLeaveJoinPool(ctx, r.PoolID)
// Attempt to parse the body as JSON.
if jsonErr := json.Unmarshal(b, &apiError); jsonErr != nil {
// If we can't parse the JSON, return the original body in the error.
Expand All @@ -241,11 +289,96 @@ func (bl *FxBlockchain) PoolCancelJoin(ctx context.Context, to peer.ID, r PoolCa
// Return the parsed error message and description.
return nil, fmt.Errorf("unexpected response: %d %s - %s", resp.StatusCode, apiError.Message, apiError.Description)
default:
bl.cleanLeaveJoinPool(ctx, r.PoolID)
return b, nil
}
}

func (bl *FxBlockchain) HandlePoolCancelJoin(method string, action string, from peer.ID, w http.ResponseWriter, r *http.Request) {
// This handles the join request sent from client for a blox which is not part of the pool yet
log := log.With("action", action, "from", from)
var req PoolCancelJoinRequest
var res PoolCancelJoinResponse

defer r.Body.Close()

if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
log.Debug("cannot parse request body: %v", err)
http.Error(w, "", http.StatusBadRequest)
return
}

ctx, cancel := context.WithTimeout(r.Context(), time.Second*time.Duration(bl.timeout))
defer cancel()
response, statusCode, err := bl.callBlockchain(ctx, method, action, &req)
if err != nil {
poolID := req.PoolID
poolIDStr := strconv.Itoa(poolID)
requestSubmitted, err := bl.checkIfUserHasOpenPoolRequests(ctx, poolIDStr)
if err == nil && !requestSubmitted {
errUpdateConfig := bl.updatePoolName("0")
errPingServer := bl.StopPingServer(ctx)
if errUpdateConfig != nil && errPingServer != nil {
errMsg := map[string]interface{}{
"message": "Pool Cancel Join is submitted but Error in Stopping Ping Server and updating Config",
"description": fmt.Sprintf("Error in Ping server: %s , Error in updateConfig: %s", errPingServer.Error(), errUpdateConfig.Error()),
}
w.WriteHeader(http.StatusExpectationFailed)
json.NewEncoder(w).Encode(errMsg)
return
} else if errUpdateConfig != nil {
errMsg := map[string]interface{}{
"message": "Pool Cancel Join is submitted but Error in updating Config",
"description": fmt.Sprintf("Error in updateConfig: %s", errUpdateConfig.Error()),
}
w.WriteHeader(http.StatusExpectationFailed)
json.NewEncoder(w).Encode(errMsg)
return
} else if errPingServer != nil {
errMsg := map[string]interface{}{
"message": "Pool Cancel Join is submitted but Error in Ping Server Stop",
"description": fmt.Sprintf("Error in PingServer: %s", errPingServer.Error()),
}
w.WriteHeader(http.StatusExpectationFailed)
json.NewEncoder(w).Encode(errMsg)
return
}
statusCode = http.StatusAccepted
err = nil
response = []byte(fmt.Sprintf("{\"account\":\"\",\"pool_id\":%d}", req.PoolID))
}
}
if statusCode == http.StatusOK {
statusCode = http.StatusAccepted
}
log.Debugw("callblockchain response in PoolCancelJoin", "statusCode", statusCode, "response", response, "err", err)
// If status code is not 200, attempt to format the response as JSON
if statusCode != http.StatusAccepted || err != nil {
w.WriteHeader(statusCode)
// Try to parse the error and format it as JSON
var errMsg map[string]interface{}
if jsonErr := json.Unmarshal(response, &errMsg); jsonErr != nil {
// If the response isn't JSON or can't be parsed, use a generic message
errMsg = map[string]interface{}{
"message": "An error occurred",
"description": err.Error(),
}
}
json.NewEncoder(w).Encode(errMsg)
return
}

bl.cleanLeaveJoinPool(ctx, req.PoolID)
w.WriteHeader(statusCode)
err1 := json.Unmarshal(response, &res)
if err1 != nil {
log.Error("failed to format response: %v", err1)
}

if err := json.NewEncoder(w).Encode(res); err != nil {
log.Error("failed to write response: %v", err)
}
}

func (bl *FxBlockchain) cleanLeaveJoinPool(ctx context.Context, PoolID int) {
bl.updatePoolName("0")
bl.StopPingServer(ctx)
Expand Down Expand Up @@ -447,6 +580,7 @@ func (bl *FxBlockchain) PoolLeave(ctx context.Context, to peer.ID, r PoolLeaveRe
}

func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID, account string, topicString string, withMemberListUpdate bool) error {
// This handles the pending pool requests on a member that is already joined the pool
if withMemberListUpdate {
err := bl.FetchUsersAndPopulateSets(ctx, topicString, false)
if err != nil {
Expand All @@ -459,14 +593,23 @@ func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID,
}
if status == common.Pending {
// Ping
log.Debugw("****** Pinging pending node", "from", bl.h.ID(), "to", from)
averageDuration, successCount, err := bl.p.Ping(ctx, from)
if err != nil {
log.Errorw("An error occurred during ping", "error", err)
return err
}
if bl.maxPingTime == 0 {
//TODO: This should not happen but is happening!
bl.maxPingTime = 200
}
if bl.minPingSuccessCount == 0 {
//TODO: This should not happen but is happening!
bl.minPingSuccessCount = 3
}
vote := averageDuration <= bl.maxPingTime && successCount >= bl.minPingSuccessCount

log.Debugw("Ping result", "averageDuration", averageDuration, "successCount", successCount, "vote", vote)
log.Debugw("Ping result", "averageDuration", averageDuration, "successCount", successCount, "vote", vote, "bl.maxPingTime", bl.maxPingTime, "bl.minPingSuccessCount", bl.minPingSuccessCount)

// Convert topic from string to int
poolID, err := strconv.Atoi(topicString)
Expand All @@ -481,13 +624,13 @@ func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID,
}

// Call PoolVote method
responseBody, statusCode, err := bl.callBlockchain(ctx, "POST", actionPoolVote, voteRequest)
responseBody, statusCode, err := bl.callBlockchain(ctx, "POST", actionPoolVote, &voteRequest)
if err != nil {
return fmt.Errorf("blockchain call error: %w, status code: %d", err, statusCode)
}

// Check if the status code is OK; if not, handle it as an error
if statusCode != http.StatusOK {
if statusCode != http.StatusOK && statusCode != http.StatusAccepted {
var errMsg map[string]interface{}
if jsonErr := json.Unmarshal(responseBody, &errMsg); jsonErr == nil {
return fmt.Errorf("unexpected response status: %d, message: %s, description: %s",
Expand All @@ -504,7 +647,7 @@ func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID,
}

// Handle the response as needed
log.Infow("Vote cast successfully", "response", voteResponse, "on", from, "by", bl.h.ID())
log.Infow("Vote cast successfully", "statusCode", statusCode, "voteResponse", voteResponse, "on", from, "by", bl.h.ID())
// Update member status to unknown
bl.membersLock.Lock()
bl.members[from] = common.Unknown
Expand Down
Loading

0 comments on commit 7457446

Please sign in to comment.