Skip to content

Commit

Permalink
Merge pull request #48 from streamingfast/feature/undo
Browse files Browse the repository at this point in the history
Feature/undo Reorg handler for postgres
  • Loading branch information
sduchesneau committed Nov 7, 2023
2 parents 56bec6c + badd052 commit 8af16f7
Show file tree
Hide file tree
Showing 24 changed files with 1,179 additions and 161 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v4.0.0-beta

### Highlights

* This release brings support for managing reorgs in Postgres database, enabled by default when `--undo-buffer-size` to 0.

### Breaking changes

* A change in your SQL schema may be required to keep existing substreams:SQL integrations working:
* The presence of a primary key (single key or composite) is now *MANDATORY* on every table.
* The `sf.substreams.sink.database.v1.TableChange` message, generated inside substreams, must now exactly match its primary key with the one in the SQL schema.
* You will need to re-run `setup` on your existing PostgreSQL databases to add the `substreams_history` table. You can use the new `--system-tables-only` flag to perform only that.

* Since reorgs management is not yet supported on Clickhouse, users will have to set `--undo-buffer-size` to a non-zero value (`12` was the previous default)

## Protodefs v1.0.4

* Added support for `rest_frontend` field with `enabled` boolean flag, aimed at this backend implementation: https://github.com/semiotic-ai/sql-wrapper
Expand Down
14 changes: 6 additions & 8 deletions cmd/substreams-sink-sql/common_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,22 @@ func newDBLoader(
cmd *cobra.Command,
psqlDSN string,
flushInterval time.Duration,
handleReorgs bool,
) (*db.Loader, error) {
moduleMismatchMode, err := db.ParseOnModuleHashMismatch(sflags.MustGetString(cmd, onModuleHashMistmatchFlag))
cli.NoError(err, "invalid mistmatch mode")

dbLoader, err := db.NewLoader(psqlDSN, flushInterval, moduleMismatchMode, zlog, tracer)
dbLoader, err := db.NewLoader(psqlDSN, flushInterval, moduleMismatchMode, handleReorgs, zlog, tracer)
if err != nil {
return nil, fmt.Errorf("new psql loader: %w", err)
}

if err := dbLoader.LoadTables(); err != nil {
var e *db.CursorError
var e *db.SystemTableError
if errors.As(err, &e) {
fmt.Printf("Error validating the cursors table: %s\n", e)
fmt.Println("You can use the following sql schema to create a cursors table")
fmt.Println()
fmt.Println(dbLoader.GetCreateCursorsTableSQL(false))
fmt.Println()
return nil, fmt.Errorf("invalid cursors table")
fmt.Printf("Error validating the system table: %s\n", e)
fmt.Println("Did you run setup ?")
return nil, e
}

return nil, fmt.Errorf("load psql table: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/generate_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func generateCsvE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("new base sinker: %w", err)
}

dbLoader, err := newDBLoader(cmd, dsn, 0) // flush interval not used in CSV mode
dbLoader, err := newDBLoader(cmd, dsn, 0, false) // flush interval not used in CSV mode
if err != nil {
return fmt.Errorf("new db loader: %w", err)
}
Expand Down
14 changes: 11 additions & 3 deletions cmd/substreams-sink-sql/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,21 @@ import (
"github.com/streamingfast/substreams/manifest"
)

type ignoreUndoBufferSize struct{}

func (i ignoreUndoBufferSize) IsIgnored(in string) bool {
return in == "undo-buffer-size"
}

var sinkRunCmd = Command(sinkRunE,
"run <dsn> <manifest> [<start>:<stop>]",
"Runs SQL sink process",
RangeArgs(2, 3),
Flags(func(flags *pflag.FlagSet) {
sink.AddFlagsToSet(flags)
sink.AddFlagsToSet(flags, ignoreUndoBufferSize{})
AddCommonSinkerFlags(flags)

flags.Int("undo-buffer-size", 0, "If non-zero, handling of reorgs in the database is disabled. Instead, a buffer is introduced to only process a blocks once it has been confirmed by that many blocks, introducing a latency but slightly reducing the load on the database when close to head.")
flags.Int("flush-interval", 1000, "When in catch up mode, flush every N blocks")
flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`")
}),
Expand Down Expand Up @@ -54,7 +61,8 @@ func sinkRunE(cmd *cobra.Command, args []string) error {
return err
}

// "github.com/streamingfast/substreams/manifest"
handleReorgs := sflags.MustGetInt(cmd, "undo-buffer-size") == 0

sink, err := sink.NewFromViper(
cmd,
supportedOutputTypes,
Expand All @@ -69,7 +77,7 @@ func sinkRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("new base sinker: %w", err)
}

dbLoader, err := newDBLoader(cmd, dsn, sflags.MustGetDuration(cmd, "flush-interval"))
dbLoader, err := newDBLoader(cmd, dsn, sflags.MustGetDuration(cmd, "flush-interval"), handleReorgs)
if err != nil {
return fmt.Errorf("new db loader: %w", err)
}
Expand Down
11 changes: 9 additions & 2 deletions cmd/substreams-sink-sql/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var sinkSetupCmd = Command(sinkSetupE,
ExactArgs(2),
Flags(func(flags *pflag.FlagSet) {
flags.Bool("postgraphile", false, "Will append the necessary 'comments' on cursors table to fully support postgraphile")
flags.Bool("system-tables-only", false, "will only create/update the systems tables (cursors, substreams_history) and ignore the schema from the manifest")
flags.Bool("ignore-duplicate-table-errors", false, "[Dev] Use this if you want to ignore duplicate table errors, take caution that this means the 'schemal.sql' file will not have run fully!")
}),
)
Expand All @@ -29,6 +30,7 @@ func sinkSetupE(cmd *cobra.Command, args []string) error {
dsn := args[0]
manifestPath := args[1]
ignoreDuplicateTableErrors := sflags.MustGetBool(cmd, "ignore-duplicate-table-errors")
systemTableOnly := sflags.MustGetBool(cmd, "system-tables-only")

reader, err := manifest.NewReader(manifestPath)
if err != nil {
Expand All @@ -44,12 +46,17 @@ func sinkSetupE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("extract sink config: %w", err)
}

dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, zlog, tracer)
dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, true, zlog, tracer)
if err != nil {
return fmt.Errorf("new psql loader: %w", err)
}

err = dbLoader.SetupFromBytes(ctx, []byte(sinkConfig.Schema), sflags.MustGetBool(cmd, "postgraphile"))
schema := sinkConfig.Schema
if systemTableOnly {
schema = ""
}

err = dbLoader.Setup(ctx, schema, sflags.MustGetBool(cmd, "postgraphile"))
if err != nil {
if isDuplicateTableError(err) && ignoreDuplicateTableErrors {
zlog.Info("received duplicate table error, script dit not executed succesfully completed")
Expand Down
21 changes: 10 additions & 11 deletions cmd/substreams-sink-sql/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var sinkToolsCmd = Group(
)

func toolsReadCursorE(cmd *cobra.Command, _ []string) error {
loader := toolsCreateLoader(true)
loader := toolsCreateLoader()

out, err := loader.GetAllCursors(cmd.Context())
cli.NoError(err, "Unable to get all cursors")
Expand All @@ -83,7 +83,7 @@ func toolsReadCursorE(cmd *cobra.Command, _ []string) error {
}

func toolsWriteCursorE(cmd *cobra.Command, args []string) error {
loader := toolsCreateLoader(true)
loader := toolsCreateLoader()

moduleHash := args[0]
opaqueCursor := args[1]
Expand Down Expand Up @@ -114,7 +114,7 @@ func toolsWriteCursorE(cmd *cobra.Command, args []string) error {
}

func toolsDeleteCursorE(cmd *cobra.Command, args []string) error {
loader := toolsCreateLoader(true)
loader := toolsCreateLoader()

moduleHash := ""
if !viper.GetBool("tools-cursor-delete-all") {
Expand Down Expand Up @@ -143,18 +143,17 @@ func toolsDeleteCursorE(cmd *cobra.Command, args []string) error {
return nil
}

func toolsCreateLoader(enforceCursorTable bool) *db.Loader {
func toolsCreateLoader() *db.Loader {
dsn := viper.GetString("tools-global-dsn")
loader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchIgnore, zlog, tracer)
loader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchIgnore, true, zlog, tracer)
cli.NoError(err, "Unable to instantiate database manager from DSN %q", dsn)

if err := loader.LoadTables(); err != nil {
var cursorError *db.CursorError
if errors.As(err, &cursorError) {
if enforceCursorTable {
fmt.Println("It seems the 'cursors' table does not exit on this database, unable to retrieve DB loader")
os.Exit(1)
}
var systemTableError *db.SystemTableError
if errors.As(err, &systemTableError) {
fmt.Printf("Error validating the system table: %s\n", systemTableError)
fmt.Println("Did you run setup ?")
os.Exit(1)
}

cli.NoError(err, "Unable to load table information from database")
Expand Down
4 changes: 2 additions & 2 deletions db/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (l *Loader) InsertCursor(ctx context.Context, moduleHash string, c *sink.Cu
// UpdateCursor updates the active cursor. If no cursor is active and no update occurred, returns
// ErrCursorNotFound. If the update was not successful on the database, returns an error.
// You can use tx=nil to run the query outside of a transaction.
func (l *Loader) UpdateCursor(ctx context.Context, tx *sql.Tx, moduleHash string, c *sink.Cursor) error {
func (l *Loader) UpdateCursor(ctx context.Context, tx Tx, moduleHash string, c *sink.Cursor) error {
_, err := l.runModifiyQuery(ctx, tx, "update", l.getDialect().GetUpdateCursorQuery(
l.cursorTable.identifier, moduleHash, c, c.Block().Num(), c.Block().ID(),
))
Expand Down Expand Up @@ -152,7 +152,7 @@ type sqlExecutor interface {
//
// If `tx` is nil, we use `l.DB` as the execution context, so an operations happening outside
// a transaction. Otherwise, tx is the execution context.
func (l *Loader) runModifiyQuery(ctx context.Context, tx *sql.Tx, action string, query string) (rowsAffected int64, err error) {
func (l *Loader) runModifiyQuery(ctx context.Context, tx Tx, action string, query string) (rowsAffected int64, err error) {
var executor sqlExecutor = l.DB
if tx != nil {
executor = tx
Expand Down
Loading

0 comments on commit 8af16f7

Please sign in to comment.