Skip to content

Commit

Permalink
fix: report "ok" for lighthouse backfill sync
Browse files Browse the repository at this point in the history
  • Loading branch information
0x416e746f6e committed Mar 22, 2024
1 parent f6d4fc5 commit 51a013e
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 73 deletions.
24 changes: 12 additions & 12 deletions healthchecker/check_geth.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"net/http"
)

func (h *Healthchecker) checkGeth(ctx context.Context, url string) error {
func (h *Healthchecker) checkGeth(ctx context.Context, url string) *healthcheckResult {
// https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_syncing

const query = `{"jsonrpc":"2.0","method":"eth_syncing","params":[],"id":0}`
Expand All @@ -35,49 +35,49 @@ func (h *Healthchecker) checkGeth(ctx context.Context, url string) error {
bytes.NewReader([]byte(query)),
)
if err != nil {
return err
return &healthcheckResult{err: err}
}
req.Header.Set("accept", "application/json")
req.Header.Set("content-type", "application/json")

res, err := http.DefaultClient.Do(req)
if err != nil {
return err
return &healthcheckResult{err: err}
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return err
return &healthcheckResult{err: err}
}

if res.StatusCode != http.StatusOK {
return fmt.Errorf(
return &healthcheckResult{err: fmt.Errorf(
"unexpected HTTP status '%d': %s",
res.StatusCode,
string(body),
)
)}
}

var status isNotSyncing
if err := json.Unmarshal(body, &status); err != nil {
var status isSyncing
if err2 := json.Unmarshal(body, &status); err2 != nil {
return fmt.Errorf(
return &healthcheckResult{err: fmt.Errorf(
"failed to parse JSON body '%s': %w",
string(body),
errors.Join(err, err2),
)
)}
}
return fmt.Errorf(
return &healthcheckResult{err: fmt.Errorf(
"geth is still syncing (current: %s, highest: %s)",
status.Result.CurrentBlock,
status.Result.HighestBlock,
)
)}
}
if status.Result { // i.e. it's syncing
return errors.New("geth is (still) syncing")
return &healthcheckResult{err: errors.New("geth is (still) syncing")}
}

return nil
return &healthcheckResult{ok: true}
}
49 changes: 29 additions & 20 deletions healthchecker/check_lighthouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"net/http"
)

func (h *Healthchecker) checkLighthouse(ctx context.Context, url string) error {
func (h *Healthchecker) checkLighthouse(ctx context.Context, url string) *healthcheckResult {
// https://lighthouse-book.sigmaprime.io/api-lighthouse.html#lighthousesyncing
// https://github.com/sigp/lighthouse/blob/v4.5.0/beacon_node/lighthouse_network/src/types/sync_state.rs#L6-L27

Expand Down Expand Up @@ -45,68 +45,77 @@ func (h *Healthchecker) checkLighthouse(ctx context.Context, url string) error {
nil,
)
if err != nil {
return err
return &healthcheckResult{err: err}
}
req.Header.Set("accept", "application/json")

res, err := http.DefaultClient.Do(req)
if err != nil {
return err
return &healthcheckResult{err: err}
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return err
return &healthcheckResult{err: err}
}

if res.StatusCode != http.StatusOK {
return fmt.Errorf(
return &healthcheckResult{err: fmt.Errorf(
"unexpected HTTP status '%d': %s",
res.StatusCode,
string(body),
)
)}
}

var state stateString
if err := json.Unmarshal(body, &state); err != nil {
var state stateStruct
if err2 := json.Unmarshal(body, &state); err2 != nil {
return fmt.Errorf(
return &healthcheckResult{err: fmt.Errorf(
"failed to parse JSON body '%s': %w",
string(body),
errors.Join(err, err2),
)
)}
}
switch {
case state.Data.BackFillSyncing != nil:
return fmt.Errorf(
//
// BackBillSyncing is "ok" because that's the state lighthouse
// switches to after checkpoint sync is complete.
//
// See: https://lighthouse-book.sigmaprime.io/checkpoint-sync.html#backfilling-blocks
//
return &healthcheckResult{ok: true, err: fmt.Errorf(
"lighthouse is in 'BackFillSyncing' state (completed: %d, remaining: %d)",
state.Data.BackFillSyncing.Completed,
state.Data.BackFillSyncing.Remaining,
)
)}
case state.Data.SyncingFinalized != nil:
return fmt.Errorf(
"lighthouse is in 'SyncingFinalized' state (start_slot: %s, target_slot: %s)",
return &healthcheckResult{err: fmt.Errorf(
"lighthouse is in 'SyncingFinalized' state (start_slot: '%s', target_slot: '%s')",
state.Data.SyncingFinalized.StartSlot,
state.Data.SyncingFinalized.TargetSlot,
)
)}
case state.Data.SyncingHead != nil:
return fmt.Errorf(
"lighthouse is in 'SyncingHead' state (start_slot: %s, target_slot: %s)",
return &healthcheckResult{err: fmt.Errorf(
"lighthouse is in 'SyncingHead' state (start_slot: '%s', target_slot: '%s')",
state.Data.SyncingHead.StartSlot,
state.Data.SyncingHead.TargetSlot,
)
)}
default:
return fmt.Errorf(
return &healthcheckResult{err: fmt.Errorf(
"lighthouse is in unrecognised state: %s",
string(body),
)
)}
}
}
if state.Data != "Synced" {
return fmt.Errorf("lighthouse is in '%s' state", state.Data)
return &healthcheckResult{err: fmt.Errorf(
"lighthouse is not in synced state: %s",
state.Data,
)}
}

return nil
return &healthcheckResult{ok: true}
}
105 changes: 65 additions & 40 deletions healthchecker/healthchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@ type Healthchecker struct {
log *zap.Logger
timeout time.Duration

monitors []monitor
monitors []healthcheckMonitor
}

type healthcheckMonitor = func(context.Context) *healthcheckResult

type healthcheckResult struct {
ok bool
err error
}

type Config struct {
MonitorGethURL string
MonitorLighthouseURL string
ServeAddress string
Timeout time.Duration
}

type monitor = func(context.Context) error
ServeAddress string
Timeout time.Duration
}

func New(cfg *Config) (*Healthchecker, error) {
h := &Healthchecker{
Expand All @@ -47,15 +53,8 @@ func New(cfg *Config) (*Healthchecker, error) {
if err != nil {
return nil, err
}
h.monitors = append(h.monitors, func(ctx context.Context) error {
if err := h.checkGeth(ctx, rpcURL); err != nil {
return fmt.Errorf(
"error while checking sync-status of geth at '%s': %w",
rpcURL,
err,
)
}
return nil
h.monitors = append(h.monitors, func(ctx context.Context) *healthcheckResult {
return h.checkGeth(ctx, rpcURL)
})
}

Expand All @@ -66,15 +65,8 @@ func New(cfg *Config) (*Healthchecker, error) {
if err != nil {
return nil, err
}
h.monitors = append(h.monitors, func(ctx context.Context) error {
if err := h.checkLighthouse(ctx, syncingURL); err != nil {
return fmt.Errorf(
"error while checking sync-status of lighthouse at '%s': %w",
syncingURL,
err,
)
}
return nil
h.monitors = append(h.monitors, func(ctx context.Context) *healthcheckResult {
return h.checkLighthouse(ctx, syncingURL)
})
}

Expand Down Expand Up @@ -118,8 +110,10 @@ func (h *Healthchecker) Serve() error {
}

func (h *Healthchecker) handleHTTPRequest(w http.ResponseWriter, r *http.Request) {
l := logutils.LoggerFromRequest(r)

count := len(h.monitors)
results := make(chan error, count)
results := make(chan *healthcheckResult, count)

for _, m := range h.monitors {
monitor := m // https://go.dev/blog/loopvar-preview
Expand All @@ -131,32 +125,63 @@ func (h *Healthchecker) handleHTTPRequest(w http.ResponseWriter, r *http.Request
}

errs := []error{}
warns := []error{}
for count > 0 {
count--
if res := <-results; res != nil {
errs = append(errs, res)
if !res.ok {
errs = append(errs, res.err)
} else if res.err != nil {
warns = append(warns, res.err)
}
}
}
close(results)

if len(errs) == 0 {
switch {
case len(errs) == 0 && len(warns) == 0:
return
}

l := logutils.LoggerFromRequest(r)
case len(errs) > 0:
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set("Content-Type", "application/text")

w.Header().Set("Content-Type", "application/text")
w.WriteHeader(http.StatusInternalServerError)
for idx, err := range errs {
line := fmt.Sprintf("%d: %s\n", idx, err)
_, err := w.Write([]byte(line))
if err != nil {
l.Error("Failed to write the response body", zap.Error(err))
for idx, err := range errs {
line := fmt.Sprintf("%d: error: %s\n", idx, err)
_, err := w.Write([]byte(line))
if err != nil {
l.Error("Failed to write the response body", zap.Error(err))
}
}
}
offset := len(errs)
for idx, warn := range warns {
line := fmt.Sprintf("%d: warning: %s\n", offset+idx, warn)
_, err := w.Write([]byte(line))
if err != nil {
l.Error("Failed to write the response body", zap.Error(err))
}
}

l.Warn(
"Failed the healthcheck due to upstream error(s)",
zap.Error(errors.Join(errs...)),
)

l.Warn(
"Failed the healthcheck due to upstream error(s)",
zap.Error(errors.Join(errs...)),
)
case len(errs) == 0 && len(warns) > 0:
w.WriteHeader(http.StatusAccepted)
w.Header().Set("Content-Type", "application/text")

for idx, warn := range warns {
line := fmt.Sprintf("%d: %s\n", idx, warn)
_, err := w.Write([]byte(line))
if err != nil {
l.Error("Failed to write the response body", zap.Error(err))
}
}

l.Warn(
"Failed the healthcheck due to upstream error(s)",
zap.Error(errors.Join(errs...)),
)
}
}
2 changes: 1 addition & 1 deletion httplogger/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func Middleware(logger *zap.Logger, next http.Handler) http.Handler {

// Passing request stats both in-message (for the human reader)
// as well as inside the structured log (for the machine parser)
logger.Info(fmt.Sprintf("%s: %s %s %d", r.URL.Scheme, r.Method, r.URL.EscapedPath(), wrapped.Status()),
logger.Info(fmt.Sprintf("%s %s %d", r.Method, r.URL.EscapedPath(), wrapped.Status()),
zap.Int("durationMs", int(time.Since(start).Milliseconds())),
zap.Int("status", wrapped.Status()),
zap.String("httpRequestID", httpRequestID),
Expand Down

0 comments on commit 51a013e

Please sign in to comment.