From f3f607ce5b83bab3986ab79c4df7121a6275507b Mon Sep 17 00:00:00 2001 From: Pushap Goyal Date: Wed, 13 Jul 2022 15:08:01 -0700 Subject: [PATCH] Handle large write batch for intrinsic tmp tables Summary: If the intrinsic tmp tables goes beyond some threshold, then we commit the write batch accumulated so far and start new transaction handling the write batch. **Changes** * Added a variable rocksdb_max_intrinsic_tmp_table_write_count which controls the size of write batch allowed for intrinsic tmp table. Once we cross that threshold, we will commit the current transaction and then start a new transaction. * If there are valid open iterators before we commit intrinsic tmp table transaction, then we refresh all iterators back to previous position after committing transaction. Recursive cte opens multiple iterators for same transaction and we need to refresh all iterators after we commit the current write batch for transaction. * I also refreshed the iterators after the first write to the write batch. This is done to handle the special case where we start rocksdb iterator on new transaction(with empty write batch). Then iterator will only see the already committed data, but ignores any new data added in write batch later. So we are pro-actively refresh the iterator after first write in write batch. * Added a status variable rocksdb_intrinsic_tmp_table_commits to count number of intrinsic tmp table commits. * create_ondisk_from_heap method uses table->record[1] for insertion. Myrocks encode_value_slice relies on table->record[0] for value encoding. We have reposition the fields in write_row specifically for intrinsic tmp table to point to record[1]. Better way to fix this is to remove the dependency on record[1] from encode_value_slice method and add dependency on input buf from write_row. Reviewed By: luqun Differential Revision: D37835016 --------------------------------------------------------------------------------------- Temporarily disable some of rocksdb.tmp_table test Summary: RocksDB tmp table needs bug fixes, and meanwhile disabling some of the rocksdb.tmp_table tests so that it won't report failures. Reviewed By: lth Differential Revision: D41391615 --------------------------------------------------------------------------------------- set m_read_opts.ignore_range_deletions properly (#1218) Summary: ignore_range_deletions should be set to true when range del is not enabled Pull Request resolved: https://github.com/facebook/mysql-5.6/pull/1218 Reviewed By: Pushapgl Differential Revision: D39115879 Pulled By: yoshinorim --- mysql-test/combinations | 7 +- mysql-test/r/mysqld--help-notwin.result | 5 + mysql-test/suite/json/r/json_table.result | 2 + mysql-test/suite/rocksdb/r/rocksdb.result | 4 + mysql-test/suite/rocksdb/t/tmp_table.test | 18 + ...trinsic_tmp_table_write_count_basic.result | 46 ++ ...intrinsic_tmp_table_write_count_basic.test | 16 + mysql-test/t/error_simulation.test | 2 - .../t/with_recursive_innodb_tmp_table.test | 1 - .../t/with_recursive_rocksdb-master.opt | 1 + storage/rocksdb/ha_rocksdb.cc | 454 ++++++++++++------ storage/rocksdb/ha_rocksdb.h | 13 +- storage/rocksdb/nosql_access.cc | 2 +- storage/rocksdb/rdb_global.h | 5 +- storage/rocksdb/rdb_iterator.cc | 24 +- storage/rocksdb/rdb_iterator.h | 9 +- 16 files changed, 443 insertions(+), 166 deletions(-) create mode 100644 mysql-test/suite/rocksdb_sys_vars/r/rocksdb_max_intrinsic_tmp_table_write_count_basic.result create mode 100644 mysql-test/suite/rocksdb_sys_vars/t/rocksdb_max_intrinsic_tmp_table_write_count_basic.test create mode 100644 mysql-test/t/with_recursive_rocksdb-master.opt diff --git a/mysql-test/combinations b/mysql-test/combinations index 9f9a4eb9ec06..b926ea840fc0 100644 --- a/mysql-test/combinations +++ b/mysql-test/combinations @@ -1,8 +1,9 @@ [innodb_intrinsic_table] enable_rocksdb_intrinsic_tmp_table= OFF -loose-rocksdb_enable_tmp_table = OFF +loose-rocksdb_enable_tmp_table= OFF [rocksdb_intrinsic_table] enable_rocksdb_intrinsic_tmp_table= ON -loose-rocksdb_enable_tmp_table = ON -loose-rocksdb_enable_delete_range_for_drop_index = ON +loose-rocksdb_enable_tmp_table= ON +loose-rocksdb_enable_delete_range_for_drop_index= ON +loose-rocksdb_max_intrinsic_tmp_table_write_count= 3 diff --git a/mysql-test/r/mysqld--help-notwin.result b/mysql-test/r/mysqld--help-notwin.result index 97c63dcd4f11..27e1ab45425c 100644 --- a/mysql-test/r/mysqld--help-notwin.result +++ b/mysql-test/r/mysqld--help-notwin.result @@ -2199,6 +2199,10 @@ The following options may be given as the first argument: Track history for at most this many completed compactions. The history is in the INFORMATION_SCHEMA.ROCKSDB_COMPACTION_HISTORY table. + --rocksdb-max-intrinsic-tmp-table-write-count=# + Intrinsic tmp table max allowed write batch size.After + this, current transaction holding write batch will commit + and newtransaction will be started. --rocksdb-max-latest-deadlocks=# Maximum number of recent deadlocks to store --rocksdb-max-log-file-size=# @@ -3657,6 +3661,7 @@ rocksdb-max-background-flushes -1 rocksdb-max-background-jobs 2 rocksdb-max-bottom-pri-background-compactions 0 rocksdb-max-compaction-history 64 +rocksdb-max-intrinsic-tmp-table-write-count 1000 rocksdb-max-latest-deadlocks 5 rocksdb-max-log-file-size 0 rocksdb-max-manifest-file-size 1073741824 diff --git a/mysql-test/suite/json/r/json_table.result b/mysql-test/suite/json/r/json_table.result index 8440179ff645..6482857e75a3 100644 --- a/mysql-test/suite/json/r/json_table.result +++ b/mysql-test/suite/json/r/json_table.result @@ -1033,6 +1033,7 @@ Variable_name Value Created_tmp_disk_tables 1 Created_tmp_files 0 Created_tmp_tables 1 +rocksdb_intrinsic_tmp_table_commits 0 set @@max_heap_table_size= @save_heap_size; set @@internal_tmp_mem_storage_engine= @save_mem_se; # @@ -1051,6 +1052,7 @@ Variable_name Value Created_tmp_disk_tables 0 Created_tmp_files 0 Created_tmp_tables 1 +rocksdb_intrinsic_tmp_table_commits 0 # # Bug#25525409: ASSERTION `TABLE_LIST->TABLE' FAILED IN SQL/SQL_BASE.CC # diff --git a/mysql-test/suite/rocksdb/r/rocksdb.result b/mysql-test/suite/rocksdb/r/rocksdb.result index 19e39c1101a9..fb09e222e7c9 100644 --- a/mysql-test/suite/rocksdb/r/rocksdb.result +++ b/mysql-test/suite/rocksdb/r/rocksdb.result @@ -999,6 +999,7 @@ rocksdb_max_background_flushes -1 rocksdb_max_background_jobs 2 rocksdb_max_bottom_pri_background_compactions 0 rocksdb_max_compaction_history 64 +rocksdb_max_intrinsic_tmp_table_write_count 1000 rocksdb_max_latest_deadlocks 5 rocksdb_max_log_file_size 0 rocksdb_max_manifest_file_size 1073741824 @@ -1600,6 +1601,7 @@ rocksdb_table_index_stats_success # rocksdb_table_index_stats_failure # rocksdb_table_index_stats_req_queue_length # rocksdb_covered_secondary_key_lookups # +rocksdb_intrinsic_tmp_table_commits # rocksdb_additional_compaction_triggers # rocksdb_binlog_ttl_compaction_timestamp # rocksdb_block_cache_add # @@ -1753,6 +1755,7 @@ ROCKSDB_GET_HIT_L1 ROCKSDB_GET_HIT_L2_AND_UP ROCKSDB_GIT_DATE ROCKSDB_GIT_HASH +ROCKSDB_INTRINSIC_TMP_TABLE_COMMITS ROCKSDB_ITER_BYTES_READ ROCKSDB_LAST_LEVEL_SEEK_DATA ROCKSDB_LAST_LEVEL_SEEK_DATA_USEFUL_FILTER_MATCH @@ -1887,6 +1890,7 @@ ROCKSDB_GET_HIT_L1 ROCKSDB_GET_HIT_L2_AND_UP ROCKSDB_GIT_DATE ROCKSDB_GIT_HASH +ROCKSDB_INTRINSIC_TMP_TABLE_COMMITS ROCKSDB_ITER_BYTES_READ ROCKSDB_LAST_LEVEL_SEEK_DATA ROCKSDB_LAST_LEVEL_SEEK_DATA_USEFUL_FILTER_MATCH diff --git a/mysql-test/suite/rocksdb/t/tmp_table.test b/mysql-test/suite/rocksdb/t/tmp_table.test index c9b34b7bb91a..9b59ae051834 100644 --- a/mysql-test/suite/rocksdb/t/tmp_table.test +++ b/mysql-test/suite/rocksdb/t/tmp_table.test @@ -360,4 +360,22 @@ show create table tmp1; select * from t1; drop table t1; +#################################################### +### [TODO] Fix bugs: Case 12: Intrinsic tmp table background commits +#################################################### +#--echo Case 12: Intrinsic tmp table background commits +#set big_tables = 1; +#Set global rocksdb_max_intrinsic_tmp_table_write_count = 2; +#let $old_value= query_get_value(show status like "rocksdb_intrinsic_tmp_table_commits", Value, 1); +#create table t1(i int, c char(5)); +#insert into t1 values (0, "aaaa"); +#insert into t1 values (1, "aaaa"); +#select i, c, count(*) from t1 group by i, c having count(*) > 0; +#let $new_value= query_get_value(show status like "rocksdb_intrinsic_tmp_table_commits", Value, 1); +#--let $assert_text = Number of intrinsic tmp table commits should be 1 +#--let $assert_cond= $new_value - $old_value = 1 +#--source include/assert.inc +#Set global rocksdb_max_intrinsic_tmp_table_write_count = default; +#drop table t1; + --source include/wait_until_count_sessions.inc diff --git a/mysql-test/suite/rocksdb_sys_vars/r/rocksdb_max_intrinsic_tmp_table_write_count_basic.result b/mysql-test/suite/rocksdb_sys_vars/r/rocksdb_max_intrinsic_tmp_table_write_count_basic.result new file mode 100644 index 000000000000..11b22c08e485 --- /dev/null +++ b/mysql-test/suite/rocksdb_sys_vars/r/rocksdb_max_intrinsic_tmp_table_write_count_basic.result @@ -0,0 +1,46 @@ +CREATE TABLE valid_values (value varchar(255)) ENGINE=myisam; +INSERT INTO valid_values VALUES(10); +INSERT INTO valid_values VALUES(20); +CREATE TABLE invalid_values (value varchar(255)) ENGINE=myisam; +INSERT INTO invalid_values VALUES('\'aaa\''); +SET @start_global_value = @@global.rocksdb_max_intrinsic_tmp_table_write_count; +SELECT @start_global_value; +@start_global_value +1000 +'# Setting to valid values in global scope#' +"Trying to set variable @@global.rocksdb_max_intrinsic_tmp_table_write_count to 10" +SET @@global.rocksdb_max_intrinsic_tmp_table_write_count = 10; +SELECT @@global.rocksdb_max_intrinsic_tmp_table_write_count; +@@global.rocksdb_max_intrinsic_tmp_table_write_count +10 +"Setting the global scope variable back to default" +SET @@global.rocksdb_max_intrinsic_tmp_table_write_count = DEFAULT; +SELECT @@global.rocksdb_max_intrinsic_tmp_table_write_count; +@@global.rocksdb_max_intrinsic_tmp_table_write_count +1000 +"Trying to set variable @@global.rocksdb_max_intrinsic_tmp_table_write_count to 20" +SET @@global.rocksdb_max_intrinsic_tmp_table_write_count = 20; +SELECT @@global.rocksdb_max_intrinsic_tmp_table_write_count; +@@global.rocksdb_max_intrinsic_tmp_table_write_count +20 +"Setting the global scope variable back to default" +SET @@global.rocksdb_max_intrinsic_tmp_table_write_count = DEFAULT; +SELECT @@global.rocksdb_max_intrinsic_tmp_table_write_count; +@@global.rocksdb_max_intrinsic_tmp_table_write_count +1000 +"Trying to set variable @@session.rocksdb_max_intrinsic_tmp_table_write_count to 444. It should fail because it is not session." +SET @@session.rocksdb_max_intrinsic_tmp_table_write_count = 444; +ERROR HY000: Variable 'rocksdb_max_intrinsic_tmp_table_write_count' is a GLOBAL variable and should be set with SET GLOBAL +'# Testing with invalid values in global scope #' +"Trying to set variable @@global.rocksdb_max_intrinsic_tmp_table_write_count to 'aaa'" +SET @@global.rocksdb_max_intrinsic_tmp_table_write_count = 'aaa'; +Got one of the listed errors +SELECT @@global.rocksdb_max_intrinsic_tmp_table_write_count; +@@global.rocksdb_max_intrinsic_tmp_table_write_count +1000 +SET @@global.rocksdb_max_intrinsic_tmp_table_write_count = @start_global_value; +SELECT @@global.rocksdb_max_intrinsic_tmp_table_write_count; +@@global.rocksdb_max_intrinsic_tmp_table_write_count +1000 +DROP TABLE valid_values; +DROP TABLE invalid_values; diff --git a/mysql-test/suite/rocksdb_sys_vars/t/rocksdb_max_intrinsic_tmp_table_write_count_basic.test b/mysql-test/suite/rocksdb_sys_vars/t/rocksdb_max_intrinsic_tmp_table_write_count_basic.test new file mode 100644 index 000000000000..9378df1687d0 --- /dev/null +++ b/mysql-test/suite/rocksdb_sys_vars/t/rocksdb_max_intrinsic_tmp_table_write_count_basic.test @@ -0,0 +1,16 @@ +--source include/have_rocksdb.inc + +CREATE TABLE valid_values (value varchar(255)) ENGINE=myisam; +INSERT INTO valid_values VALUES(10); +INSERT INTO valid_values VALUES(20); + +CREATE TABLE invalid_values (value varchar(255)) ENGINE=myisam; +INSERT INTO invalid_values VALUES('\'aaa\''); + +--let $sys_var=rocksdb_max_intrinsic_tmp_table_write_count +--let $read_only=0 +--let $session=0 +--source ../include/rocksdb_sys_var.inc + +DROP TABLE valid_values; +DROP TABLE invalid_values; diff --git a/mysql-test/t/error_simulation.test b/mysql-test/t/error_simulation.test index 20dd52307534..bd049c8b37a1 100644 --- a/mysql-test/t/error_simulation.test +++ b/mysql-test/t/error_simulation.test @@ -1,6 +1,4 @@ -- source include/have_debug.inc ---source include/have_innodb_intrinsic_table.inc - # # Bug #28499: crash for grouping query when tmp_table_size is too small # diff --git a/mysql-test/t/with_recursive_innodb_tmp_table.test b/mysql-test/t/with_recursive_innodb_tmp_table.test index 73f38439d22c..482a2e7abbd8 100644 --- a/mysql-test/t/with_recursive_innodb_tmp_table.test +++ b/mysql-test/t/with_recursive_innodb_tmp_table.test @@ -1,4 +1,3 @@ ---source include/have_innodb_intrinsic_table.inc --source include/have_64bit.inc --source include/no_valgrind_without_big.inc diff --git a/mysql-test/t/with_recursive_rocksdb-master.opt b/mysql-test/t/with_recursive_rocksdb-master.opt new file mode 100644 index 000000000000..1c26bf22ebb0 --- /dev/null +++ b/mysql-test/t/with_recursive_rocksdb-master.opt @@ -0,0 +1 @@ +--loose-rocksdb_max_intrinsic_tmp_table_write_count=1000000 diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 944a57f52952..891b8f766a4b 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -262,6 +262,10 @@ void ha_rocksdb::inc_covered_sk_lookup() { global_stats.covered_secondary_key_lookups.inc(); } +void ha_rocksdb::inc_intrinsic_tmp_table_commits() { + global_stats.intrinsic_tmp_table_commits.inc(); +} + static handler *rocksdb_create_handler(my_core::handlerton *hton, my_core::TABLE_SHARE *table_arg, bool partitioned, @@ -860,6 +864,7 @@ static uint32_t rocksdb_select_bypass_rejected_query_history_size = 0; static uint32_t rocksdb_select_bypass_debug_row_delay = 0; static bool rocksdb_bypass_rpc_on = true; static bool rocksdb_bypass_rpc_log_rejected = false; +static uint32_t rocksdb_max_intrinsic_tmp_table_write_count = 0; static unsigned long long // NOLINT(runtime/int) rocksdb_select_bypass_multiget_min = 0; static bool rocksdb_skip_locks_if_skip_unique_check = false; @@ -2718,6 +2723,14 @@ static MYSQL_SYSVAR_BOOL(column_default_value_as_expression, "allow column default value expressed in function", nullptr, nullptr, true); +static MYSQL_SYSVAR_UINT( + max_intrinsic_tmp_table_write_count, + rocksdb_max_intrinsic_tmp_table_write_count, PLUGIN_VAR_RQCMDARG, + "Intrinsic tmp table max allowed write batch size." + "After this, current transaction holding write batch will commit and new" + "transaction will be started.", + nullptr, nullptr, /* default */ 1000, /* min */ 1, /* max */ UINT_MAX, 0); + static const int ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE = 100; static struct SYS_VAR *rocksdb_system_variables[] = { @@ -2932,6 +2945,7 @@ static struct SYS_VAR *rocksdb_system_variables[] = { MYSQL_SYSVAR(alter_table_comment_inplace), MYSQL_SYSVAR(column_default_value_as_expression), MYSQL_SYSVAR(enable_delete_range_for_drop_index), + MYSQL_SYSVAR(max_intrinsic_tmp_table_write_count), nullptr}; static bool is_tmp_table(const std::string &tablename) { @@ -3167,7 +3181,7 @@ static int rdb_dbug_set_ttl_read_filter_ts(); */ class Rdb_transaction { protected: - ulonglong m_write_count = 0; + ulonglong m_write_count[2] = {0, 0}; ulonglong m_insert_count = 0; ulonglong m_update_count = 0; ulonglong m_delete_count = 0; @@ -3210,7 +3224,8 @@ class Rdb_transaction { // This should be used only when updating binlog information. virtual rocksdb::WriteBatchBase *get_write_batch() = 0; - virtual bool commit_no_binlog() = 0; + virtual bool commit_no_binlog( + TABLE_TYPE table_type = TABLE_TYPE::USER_TABLE) = 0; /* @detail @@ -3246,7 +3261,7 @@ class Rdb_transaction { virtual void do_rollback_to_savepoint() = 0; public: - rocksdb::ReadOptions m_read_opts; + rocksdb::ReadOptions m_read_opts[2]; const char *m_mysql_log_file_name; my_off_t m_mysql_log_offset; const char *m_mysql_gtid; @@ -3255,6 +3270,7 @@ class Rdb_transaction { int64_t m_snapshot_timestamp = 0; bool m_ddl_transaction; std::shared_ptr m_explicit_snapshot; + bool should_refresh_iterator_after_first_write = false; /* Tracks the number of tables in use through external_lock. @@ -3403,7 +3419,10 @@ class Rdb_transaction { virtual void set_lock_timeout(int timeout_sec_arg, TABLE_TYPE table_type) = 0; - ulonglong get_write_count() const { return m_write_count; } + ulonglong get_write_count( + TABLE_TYPE table_type = TABLE_TYPE::USER_TABLE) const { + return m_write_count[table_type]; + } ulonglong get_insert_count() const { return m_insert_count; } @@ -3490,7 +3509,8 @@ class Rdb_transaction { void snapshot_created(const rocksdb::Snapshot *const snapshot) { assert(snapshot != nullptr); - m_read_opts.snapshot = snapshot; + m_read_opts[USER_TABLE].snapshot = snapshot; + // TODO: Use snapshot timestamp from rocksdb Snapshot object itself. This // saves the extra call to fetch current time, and allows TTL compaction // (which uses rocksdb timestamp) to be consistent with TTL read filtering @@ -3506,7 +3526,10 @@ class Rdb_transaction { virtual void acquire_snapshot(bool acquire_now, TABLE_TYPE table_type) = 0; virtual void release_snapshot(TABLE_TYPE table_type) = 0; - bool has_snapshot() const { return m_read_opts.snapshot != nullptr; } + bool has_snapshot(TABLE_TYPE table_type) const { + if (table_type == INTRINSIC_TMP) return false; + return m_read_opts[table_type].snapshot != nullptr; + } private: // The Rdb_sst_info structures we are currently loading. In a partitioned @@ -4098,14 +4121,14 @@ class Rdb_transaction { Add test coverage for what happens when somebody attempts to do bulk inserts while inside a multi-statement transaction. */ - bool flush_batch() { - if (get_write_count() == 0) return false; + bool flush_batch(TABLE_TYPE table_type) { + if (get_write_count(table_type) == 0) return false; /* Commit the current transaction */ - if (commit_no_binlog()) return true; + if (commit_no_binlog(table_type)) return true; /* Start another one */ - start_tx(TABLE_TYPE::USER_TABLE); + start_tx(table_type); return false; } @@ -4178,7 +4201,7 @@ class Rdb_transaction { if (create_snapshot) acquire_snapshot(true, table_type); - rocksdb::ReadOptions options = m_read_opts; + rocksdb::ReadOptions options = m_read_opts[table_type]; const bool fill_cache = !THDVAR(get_thd(), skip_fill_cache); if (skip_bloom_filter) { @@ -4210,7 +4233,8 @@ class Rdb_transaction { protected: // Non-virtual functions with actions to be done on transaction start and // commit. - void on_commit() { + void on_commit(TABLE_TYPE table_type) { + if (table_type == TABLE_TYPE::INTRINSIC_TMP) return; time_t tm; tm = time(nullptr); for (auto &it : modified_tables) { @@ -4266,7 +4290,7 @@ class Rdb_transaction { entire transaction. */ do_set_savepoint(); - m_writes_at_last_savepoint = m_write_count; + m_writes_at_last_savepoint = m_write_count[USER_TABLE]; } /* @@ -4277,7 +4301,7 @@ class Rdb_transaction { // Take another RocksDB savepoint only if we had changes since the last // one. This is very important for long transactions doing lots of // SELECTs. - if (m_writes_at_last_savepoint != m_write_count) { + if (m_writes_at_last_savepoint != m_write_count[USER_TABLE]) { rocksdb::Status status = rocksdb::Status::NotFound(); while ((status = do_pop_savepoint()) == rocksdb::Status::OK()) { } @@ -4287,7 +4311,7 @@ class Rdb_transaction { } do_set_savepoint(); - m_writes_at_last_savepoint = m_write_count; + m_writes_at_last_savepoint = m_write_count[USER_TABLE]; } return HA_EXIT_SUCCESS; @@ -4297,7 +4321,7 @@ class Rdb_transaction { Rollback to the savepoint we've set before the last statement */ void rollback_to_stmt_savepoint() { - if (m_writes_at_last_savepoint != m_write_count) { + if (m_writes_at_last_savepoint != m_write_count[USER_TABLE]) { do_rollback_to_savepoint(); /* RollbackToSavePoint "removes the most recent SetSavePoint()", so @@ -4307,7 +4331,7 @@ class Rdb_transaction { statement start) because setting a savepoint is cheap. */ do_set_savepoint(); - m_write_count = m_writes_at_last_savepoint; + m_write_count[USER_TABLE] = m_writes_at_last_savepoint; } } @@ -4373,7 +4397,12 @@ class Rdb_transaction { } explicit Rdb_transaction(THD *const thd) - : m_thd(thd), m_tbl_io_perf(nullptr) {} + : m_thd(thd), m_tbl_io_perf(nullptr) { + m_read_opts[INTRINSIC_TMP].ignore_range_deletions = + !rocksdb_enable_delete_range_for_drop_index; + m_read_opts[USER_TABLE].ignore_range_deletions = + !rocksdb_enable_delete_range_for_drop_index; + } virtual ~Rdb_transaction() { #ifndef NDEBUG @@ -4478,13 +4507,13 @@ class Rdb_transaction_impl : public Rdb_transaction { return true; } - bool commit_no_binlog() override { + bool commit_no_binlog(TABLE_TYPE table_type) override { bool res = false; rocksdb::Status s; s = merge_auto_incr_map( - m_rocksdb_tx[TABLE_TYPE::USER_TABLE]->GetWriteBatch()->GetWriteBatch()); -#ifndef NDEBUG + m_rocksdb_tx[table_type]->GetWriteBatch()->GetWriteBatch()); +#ifndef DBUG_OFF DBUG_EXECUTE_IF("myrocks_commit_merge_io_error", dbug_change_status_to_io_error(&s);); DBUG_EXECUTE_IF("myrocks_commit_merge_incomplete", @@ -4496,9 +4525,9 @@ class Rdb_transaction_impl : public Rdb_transaction { goto error; } - release_snapshot(TABLE_TYPE::USER_TABLE); - s = m_rocksdb_tx[TABLE_TYPE::USER_TABLE]->Commit(); -#ifndef NDEBUG + release_snapshot(table_type); + s = m_rocksdb_tx[table_type]->Commit(); +#ifndef DBUG_OFF DBUG_EXECUTE_IF("myrocks_commit_io_error", dbug_change_status_to_io_error(&s);); DBUG_EXECUTE_IF("myrocks_commit_incomplete", @@ -4510,26 +4539,34 @@ class Rdb_transaction_impl : public Rdb_transaction { goto error; } - on_commit(); + on_commit(table_type); error: - on_rollback(); - /* Save the transaction object to be reused */ - release_tx(); - - m_write_count = 0; - m_insert_count = 0; - m_update_count = 0; - m_delete_count = 0; - m_row_lock_count = 0; - set_tx_read_only(false); - m_rollback_only = false; + if (table_type == USER_TABLE) { + on_rollback(); + /* Save the transaction object to be reused */ + release_tx(); + m_write_count[USER_TABLE] = 0; + m_write_count[INTRINSIC_TMP] = 0; + m_insert_count = 0; + m_update_count = 0; + m_delete_count = 0; + m_row_lock_count = 0; + set_tx_read_only(false); + m_rollback_only = false; + } else { + m_write_count[INTRINSIC_TMP] = 0; + // clean up only tmp table tx + m_rocksdb_reuse_tx[INTRINSIC_TMP] = m_rocksdb_tx[INTRINSIC_TMP]; + m_rocksdb_tx[INTRINSIC_TMP] = nullptr; + } return res; } public: void rollback() override { on_rollback(); - m_write_count = 0; + m_write_count[USER_TABLE] = 0; + m_write_count[INTRINSIC_TMP] = 0; m_insert_count = 0; m_update_count = 0; m_delete_count = 0; @@ -4546,18 +4583,18 @@ class Rdb_transaction_impl : public Rdb_transaction { set_tx_read_only(false); m_rollback_only = false; + } else { + m_rocksdb_reuse_tx[INTRINSIC_TMP] = m_rocksdb_tx[INTRINSIC_TMP]; + m_rocksdb_tx[INTRINSIC_TMP] = nullptr; } } void acquire_snapshot(bool acquire_now, TABLE_TYPE table_type) override { if (table_type == INTRINSIC_TMP) { - // intrinsic tmp table iterator currently reads all data from local write - // batch, so there is no need for explicit snapshot. TODO(pgl): Revisit - // this later after we enabled commit in batches for big write batch. return; } - if (m_read_opts.snapshot == nullptr) { + if (m_read_opts[table_type].snapshot == nullptr) { const auto thd_ss = std::static_pointer_cast( m_thd->get_explicit_snapshot()); if (thd_ss) { @@ -4569,11 +4606,10 @@ class Rdb_transaction_impl : public Rdb_transaction { } else if (is_tx_read_only()) { snapshot_created(rdb->GetSnapshot()); } else if (acquire_now) { - m_rocksdb_tx[TABLE_TYPE::USER_TABLE]->SetSnapshot(); - snapshot_created(m_rocksdb_tx[TABLE_TYPE::USER_TABLE]->GetSnapshot()); + m_rocksdb_tx[table_type]->SetSnapshot(); + snapshot_created(m_rocksdb_tx[table_type]->GetSnapshot()); } else if (!m_is_delayed_snapshot) { - m_rocksdb_tx[TABLE_TYPE::USER_TABLE]->SetSnapshotOnNextOperation( - m_notifier); + m_rocksdb_tx[table_type]->SetSnapshotOnNextOperation(m_notifier); m_is_delayed_snapshot = true; } } @@ -4583,36 +4619,38 @@ class Rdb_transaction_impl : public Rdb_transaction { if (table_type == INTRINSIC_TMP) { return; } + bool need_clear = m_is_delayed_snapshot; - if (m_read_opts.snapshot != nullptr) { + if (m_read_opts[table_type].snapshot != nullptr) { m_snapshot_timestamp = 0; if (m_explicit_snapshot) { m_explicit_snapshot.reset(); need_clear = false; } else if (is_tx_read_only()) { - rdb->ReleaseSnapshot(m_read_opts.snapshot); + rdb->ReleaseSnapshot(m_read_opts[table_type].snapshot); need_clear = false; } else { need_clear = true; } - m_read_opts.snapshot = nullptr; + m_read_opts[table_type].snapshot = nullptr; } - if (need_clear && m_rocksdb_tx[TABLE_TYPE::USER_TABLE] != nullptr) - m_rocksdb_tx[TABLE_TYPE::USER_TABLE]->ClearSnapshot(); + if (need_clear && m_rocksdb_tx[table_type] != nullptr) + m_rocksdb_tx[table_type]->ClearSnapshot(); m_is_delayed_snapshot = false; } - bool has_snapshot() { return m_read_opts.snapshot != nullptr; } + bool has_snapshot(TABLE_TYPE table_type) { + if (table_type == INTRINSIC_TMP) return false; + return m_read_opts[table_type].snapshot != nullptr; + } rocksdb::Status put(rocksdb::ColumnFamilyHandle *const column_family, const rocksdb::Slice &key, const rocksdb::Slice &value, TABLE_TYPE table_type, const bool assume_tracked) override { - if (table_type == USER_TABLE) { - ++m_write_count; - } + ++m_write_count[table_type]; return m_rocksdb_tx[table_type]->Put(column_family, key, value, assume_tracked); } @@ -4620,9 +4658,7 @@ class Rdb_transaction_impl : public Rdb_transaction { rocksdb::Status delete_key(rocksdb::ColumnFamilyHandle *const column_family, const rocksdb::Slice &key, TABLE_TYPE table_type, const bool assume_tracked) override { - if (table_type == USER_TABLE) { - ++m_write_count; - } + ++m_write_count[table_type]; return m_rocksdb_tx[table_type]->Delete(column_family, key, assume_tracked); } @@ -4630,9 +4666,7 @@ class Rdb_transaction_impl : public Rdb_transaction { rocksdb::ColumnFamilyHandle *const column_family, const rocksdb::Slice &key, TABLE_TYPE table_type, const bool assume_tracked) override { - if (table_type == USER_TABLE) { - ++m_write_count; - } + ++m_write_count[table_type]; return m_rocksdb_tx[table_type]->SingleDelete(column_family, key, assume_tracked); } @@ -4663,9 +4697,7 @@ class Rdb_transaction_impl : public Rdb_transaction { */ rocksdb::WriteBatchBase *get_indexed_write_batch( TABLE_TYPE table_type) override { - if (table_type == USER_TABLE) { - ++m_write_count; - } + ++m_write_count[table_type]; return m_rocksdb_tx[table_type]->GetWriteBatch(); } @@ -4685,7 +4717,7 @@ class Rdb_transaction_impl : public Rdb_transaction { // select * from qn; rocksdb::PinnableSlice pin_val; rocksdb::Status s = m_rocksdb_tx[table_type]->Get( - m_read_opts, column_family, key, &pin_val); + m_read_opts[table_type], column_family, key, &pin_val); pin_val.Reset(); return s; } else { @@ -4693,8 +4725,8 @@ class Rdb_transaction_impl : public Rdb_transaction { if (table_type == USER_TABLE) { global_stats.queries[QUERIES_POINT].inc(); } - return m_rocksdb_tx[table_type]->Get(m_read_opts, column_family, key, - value); + return m_rocksdb_tx[table_type]->Get(m_read_opts[table_type], + column_family, key, value); } } @@ -4703,8 +4735,9 @@ class Rdb_transaction_impl : public Rdb_transaction { rocksdb::PinnableSlice *values, TABLE_TYPE table_type, rocksdb::Status *statuses, const bool sorted_input) const override { - m_rocksdb_tx[table_type]->MultiGet(m_read_opts, column_family, num_keys, - keys, values, statuses, sorted_input); + m_rocksdb_tx[table_type]->MultiGet(m_read_opts[table_type], column_family, + num_keys, keys, values, statuses, + sorted_input); } rocksdb::Status get_for_update(const Rdb_key_def &key_descr, @@ -4742,18 +4775,18 @@ class Rdb_transaction_impl : public Rdb_transaction { rocksdb::Status s; // If snapshot is null, pass it to GetForUpdate and snapshot is // initialized there. Snapshot validation is skipped in that case. - if (m_read_opts.snapshot == nullptr || do_validate) { + if (m_read_opts[table_type].snapshot == nullptr || do_validate) { s = m_rocksdb_tx[table_type]->GetForUpdate( - m_read_opts, column_family, key, value, exclusive, - m_read_opts.snapshot ? do_validate : false); + m_read_opts[table_type], column_family, key, value, exclusive, + m_read_opts[table_type].snapshot ? do_validate : false); } else { // If snapshot is set, and if skipping validation, // call GetForUpdate without validation and set back old snapshot - auto saved_snapshot = m_read_opts.snapshot; - m_read_opts.snapshot = nullptr; - s = m_rocksdb_tx[table_type]->GetForUpdate(m_read_opts, column_family, - key, value, exclusive, false); - m_read_opts.snapshot = saved_snapshot; + auto saved_snapshot = m_read_opts[table_type].snapshot; + m_read_opts[table_type].snapshot = nullptr; + s = m_rocksdb_tx[table_type]->GetForUpdate( + m_read_opts[table_type], column_family, key, value, exclusive, false); + m_read_opts[table_type].snapshot = saved_snapshot; } // row_lock_count is to track per row instead of per key @@ -4803,6 +4836,7 @@ class Rdb_transaction_impl : public Rdb_transaction { m_rocksdb_tx[table_type] = rdb->BeginTransaction( write_opts, tx_opts, m_rocksdb_reuse_tx[table_type]); m_rocksdb_reuse_tx[table_type] = nullptr; + m_read_opts[table_type] = rocksdb::ReadOptions(); } else { write_opts.sync = (rocksdb_flush_log_at_trx_commit == FLUSH_LOG_SYNC) && rdb_sync_wal_supported(); @@ -4819,7 +4853,9 @@ class Rdb_transaction_impl : public Rdb_transaction { write_opts, tx_opts, m_rocksdb_reuse_tx[table_type]); m_rocksdb_reuse_tx[table_type] = nullptr; - m_read_opts = rocksdb::ReadOptions(); + m_read_opts[table_type] = rocksdb::ReadOptions(); + m_read_opts[table_type].ignore_range_deletions = + !rocksdb_enable_delete_range_for_drop_index; set_initial_savepoint(); @@ -4888,7 +4924,7 @@ class Rdb_transaction_impl : public Rdb_transaction { if (org_snapshot != cur_snapshot) { if (org_snapshot != nullptr) m_snapshot_timestamp = 0; - m_read_opts.snapshot = cur_snapshot; + m_read_opts[TABLE_TYPE::USER_TABLE].snapshot = cur_snapshot; if (cur_snapshot != nullptr) { rdb->GetEnv()->GetCurrentTime(&m_snapshot_timestamp); } else { @@ -4943,15 +4979,20 @@ class Rdb_writebatch_impl : public Rdb_transaction { // Called after commit/rollback. void reset() { m_batch->Clear(); - m_read_opts = rocksdb::ReadOptions(); + m_read_opts[USER_TABLE] = rocksdb::ReadOptions(); + m_read_opts[USER_TABLE].ignore_range_deletions = + !rocksdb_enable_delete_range_for_drop_index; m_ddl_transaction = false; } private: bool prepare() override { return true; } - bool commit_no_binlog() override { + bool commit_no_binlog(TABLE_TYPE table_type) override { bool res = false; + if (table_type == INTRINSIC_TMP) { + return res; + } rocksdb::Status s; rocksdb::TransactionDBWriteOptimizations optimize; optimize.skip_concurrency_control = true; @@ -4962,7 +5003,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { res = true; goto error; } - release_snapshot(TABLE_TYPE::USER_TABLE); + release_snapshot(table_type); s = rdb->Write(write_opts, optimize, m_batch->GetWriteBatch()); if (!s.ok()) { @@ -4970,12 +5011,12 @@ class Rdb_writebatch_impl : public Rdb_transaction { res = true; goto error; } - on_commit(); + on_commit(table_type); error: on_rollback(); reset(); - m_write_count = 0; + m_write_count[table_type] = 0; m_insert_count = 0; m_update_count = 0; m_delete_count = 0; @@ -5010,7 +5051,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { void rollback() override { on_rollback(); - m_write_count = 0; + m_write_count[TABLE_TYPE::USER_TABLE] = 0; m_insert_count = 0; m_update_count = 0; m_delete_count = 0; @@ -5028,7 +5069,8 @@ class Rdb_writebatch_impl : public Rdb_transaction { assert(false); return; } - if (m_read_opts.snapshot == nullptr) snapshot_created(rdb->GetSnapshot()); + if (m_read_opts[table_type].snapshot == nullptr) + snapshot_created(rdb->GetSnapshot()); } void release_snapshot(TABLE_TYPE table_type) override { @@ -5036,9 +5078,9 @@ class Rdb_writebatch_impl : public Rdb_transaction { assert(false); return; } - if (m_read_opts.snapshot != nullptr) { - rdb->ReleaseSnapshot(m_read_opts.snapshot); - m_read_opts.snapshot = nullptr; + if (m_read_opts[table_type].snapshot != nullptr) { + rdb->ReleaseSnapshot(m_read_opts[table_type].snapshot); + m_read_opts[table_type].snapshot = nullptr; } } @@ -5051,7 +5093,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { return rocksdb::Status::NotSupported( "Not supported for intrinsic tmp tables"); } - ++m_write_count; + ++m_write_count[table_type]; m_batch->Put(column_family, key, value); // Note Put/Delete in write batch doesn't return any error code. We simply // return OK here. @@ -5067,7 +5109,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { return rocksdb::Status::NotSupported( "Not supported for intrinsic tmp tables"); } - ++m_write_count; + ++m_write_count[table_type]; m_batch->Delete(column_family, key); return rocksdb::Status::OK(); } @@ -5081,7 +5123,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { return rocksdb::Status::NotSupported( "Not supported for intrinsic tmp tables"); } - ++m_write_count; + ++m_write_count[table_type]; m_batch->SingleDelete(column_family, key); return rocksdb::Status::OK(); } @@ -5098,7 +5140,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { assert(false); return nullptr; } - ++m_write_count; + ++m_write_count[table_type]; return m_batch; } @@ -5112,8 +5154,8 @@ class Rdb_writebatch_impl : public Rdb_transaction { "Not supported for intrinsic tmp tables"); } value->Reset(); - return m_batch->GetFromBatchAndDB(rdb, m_read_opts, column_family, key, - value); + return m_batch->GetFromBatchAndDB(rdb, m_read_opts[table_type], + column_family, key, value); } void multi_get(rocksdb::ColumnFamilyHandle *const column_family, @@ -5125,8 +5167,9 @@ class Rdb_writebatch_impl : public Rdb_transaction { assert(false); return; } - m_batch->MultiGetFromBatchAndDB(rdb, m_read_opts, column_family, num_keys, - keys, values, statuses, sorted_input); + m_batch->MultiGetFromBatchAndDB(rdb, m_read_opts[table_type], column_family, + num_keys, keys, values, statuses, + sorted_input); } rocksdb::Status get_for_update(const Rdb_key_def &key_descr, @@ -5256,6 +5299,37 @@ class Rdb_ha_data { void set_disable_file_deletions(bool d) { disable_file_deletions = d; } + void add_tmp_table_handler(ha_rocksdb *rocksdb_handler) { + m_tmp_table_handlers.insert(rocksdb_handler); + } + + void remove_tmp_table_handler(ha_rocksdb *rocksdb_handler) { + m_tmp_table_handlers.erase(m_tmp_table_handlers.find(rocksdb_handler)); + } + + bool refresh_iterator_for_all_handlers( + const std::vector *output) { + bool res = false; + int count = 0; + for (auto const &handler : m_tmp_table_handlers) { + res = handler->refresh_tmp_table_iterator((*output)[count++]); + if (res) { + return res; + } + } + return res; + } + + void extract_iterator_keys_for_all_handlers( + std::vector *output) { + for (auto const &handler : m_tmp_table_handlers) { + // current_key will be empty if the iterator is invalid. + std::string current_key = {}; + handler->extract_snapshot_keys(¤t_key); + output->push_back(current_key); + } + } + private: void clear_checkpoint_dir() { if (checkpoint_dir) { @@ -5268,6 +5342,7 @@ class Rdb_ha_data { char *checkpoint_dir; Rdb_transaction *trx; bool disable_file_deletions; + std::multiset m_tmp_table_handlers; }; static Rdb_ha_data *&get_ha_data(THD *const thd) { @@ -5289,6 +5364,14 @@ Rdb_transaction *get_tx_from_thd(THD *const thd) { return get_ha_data(thd)->get_trx(); } +void add_tmp_table_handler(THD *const thd, ha_rocksdb *rocksdb_handler) { + get_ha_data(thd)->add_tmp_table_handler(rocksdb_handler); +} + +void remove_tmp_table_handler(THD *const thd, ha_rocksdb *rocksdb_handler) { + get_ha_data(thd)->remove_tmp_table_handler(rocksdb_handler); +} + static void set_tx_on_thd(THD *const thd, Rdb_transaction *trx) { return get_ha_data(thd)->set_trx(trx); } @@ -6664,7 +6747,7 @@ static int rocksdb_start_tx_and_assign_read_view( Rdb_transaction *const tx = get_or_create_tx(thd, TABLE_TYPE::USER_TABLE); Rdb_perf_context_guard guard(tx, thd); - assert(!tx->has_snapshot()); + assert(!tx->has_snapshot(TABLE_TYPE::USER_TABLE)); tx->set_tx_read_only(true); rocksdb_register_tx(hton, thd, tx); tx->acquire_snapshot(true, TABLE_TYPE::USER_TABLE); @@ -6721,15 +6804,15 @@ static int rocksdb_start_tx_with_shared_read_view( tx->m_explicit_snapshot = explicit_snapshot; } - assert(!tx->has_snapshot()); + assert(!tx->has_snapshot(TABLE_TYPE::USER_TABLE)); tx->set_tx_read_only(true); rocksdb_register_tx(hton, thd, tx); tx->acquire_snapshot(true, TABLE_TYPE::USER_TABLE); // case: an explicit snapshot was not assigned to this transaction if (!tx->m_explicit_snapshot) { - tx->m_explicit_snapshot = - Rdb_explicit_snapshot::create(ss_info, rdb, tx->m_read_opts.snapshot); + tx->m_explicit_snapshot = Rdb_explicit_snapshot::create( + ss_info, rdb, tx->m_read_opts[TABLE_TYPE::USER_TABLE].snapshot); if (!tx->m_explicit_snapshot) { my_printf_error(ER_UNKNOWN_ERROR, "Could not create snapshot", MYF(0)); error = HA_EXIT_FAILURE; @@ -7847,6 +7930,7 @@ static int rocksdb_init_func(void *const p) { // to IOError or corruption. The good practice is always check it. // https://github.com/facebook/rocksdb/wiki/Iterator#error-handling bool is_valid_iterator(rocksdb::Iterator *scan_it) { + if (scan_it == nullptr) return false; if (scan_it->Valid()) { return true; } else { @@ -7981,14 +8065,15 @@ ulonglong ha_rocksdb::load_auto_incr_value_from_index() { assert(!m_key_descr_arr[active_index_pos()]->is_partial_index()); std::unique_ptr save_iterator(new Rdb_iterator_base( - ha_thd(), m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def)); + ha_thd(), nullptr, m_key_descr_arr[active_index_pos()], m_pk_descr, + m_tbl_def)); std::swap(m_iterator, save_iterator); ulonglong last_val = 0; Rdb_transaction *const tx = get_or_create_tx(table->in_use, m_tbl_def->get_table_type()); - const bool is_new_snapshot = !tx->has_snapshot(); + const bool is_new_snapshot = !tx->has_snapshot(m_tbl_def->get_table_type()); if (is_new_snapshot) { tx->acquire_snapshot(true, m_tbl_def->get_table_type()); } @@ -8085,12 +8170,13 @@ int ha_rocksdb::load_hidden_pk_value() { const uint8 save_table_status = table->m_status; std::unique_ptr save_iterator(new Rdb_iterator_base( - ha_thd(), m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def)); + ha_thd(), nullptr, m_key_descr_arr[active_index_pos()], m_pk_descr, + m_tbl_def)); std::swap(m_iterator, save_iterator); Rdb_transaction *const tx = get_or_create_tx(table->in_use, m_tbl_def->get_table_type()); - const bool is_new_snapshot = !tx->has_snapshot(); + const bool is_new_snapshot = !tx->has_snapshot(m_tbl_def->get_table_type()); longlong hidden_pk_id = 1; longlong old = 0; @@ -8620,7 +8706,7 @@ int ha_rocksdb::close(void) { m_pk_descr = nullptr; m_key_descr_arr = nullptr; m_converter = nullptr; - m_iterator = nullptr; + m_iterator.reset(nullptr); free_key_buffers(); if (m_table_handler != nullptr) { @@ -8664,6 +8750,7 @@ static const char *const rdb_error_messages[] = { "RocksDB status: deadlock.", "RocksDB status: expired.", "RocksDB status: try again.", + "RocksDB commit failed for intrinsic tmp table.", }; static_assert((sizeof(rdb_error_messages) / sizeof(rdb_error_messages[0])) == @@ -10135,7 +10222,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key, Rdb_transaction *const tx = get_or_create_tx(table->in_use, m_tbl_def->get_table_type()); - const bool is_new_snapshot = !tx->has_snapshot(); + const bool is_new_snapshot = !tx->has_snapshot(m_tbl_def->get_table_type()); // Loop as long as we get a deadlock error AND we end up creating the // snapshot here (i.e. it did not exist prior to this) @@ -10309,7 +10396,6 @@ int ha_rocksdb::check(THD *const thd MY_ATTRIBUTE((__unused__)), read_hidden_pk_id_from_rowkey(&hidden_pk_id)) { goto error; } - /* Check if we get the same PK value */ uint packed_size = m_pk_descr->pack_record( table, m_pack_buffer, table->record[0], m_pk_packed_tuple, nullptr, @@ -10321,7 +10407,6 @@ int ha_rocksdb::check(THD *const thd MY_ATTRIBUTE((__unused__)), table_name, rows); goto print_and_error; } - /* Check if we get the same secondary key value */ packed_size = m_key_descr_arr[keyno]->pack_record( table, m_pack_buffer, table->record[0], m_sk_packed_tuple, @@ -10795,8 +10880,8 @@ int ha_rocksdb::index_next_with_direction_intern(uchar *const buf, Rdb_iterator_base *ha_rocksdb::get_pk_iterator() { if (!m_pk_iterator) { - m_pk_iterator.reset( - new Rdb_iterator_base(ha_thd(), m_pk_descr, m_pk_descr, m_tbl_def)); + m_pk_iterator.reset(new Rdb_iterator_base(ha_thd(), nullptr, m_pk_descr, + m_pk_descr, m_tbl_def)); } return m_pk_iterator.get(); } @@ -10974,7 +11059,89 @@ static bool commit_in_the_middle(THD *thd) { bool ha_rocksdb::do_bulk_commit(Rdb_transaction *const tx) { return commit_in_the_middle(table->in_use) && tx->get_write_count() >= THDVAR(table->in_use, bulk_load_size) && - tx->flush_batch(); + tx->flush_batch(TABLE_TYPE::USER_TABLE); +} +/* + Commits the write batch accumulated so far. + Steps: + 1) Check if the write batch is more than threshold. + 2) If yes, then extract the current keys for all open iterators. After write + batch flush, iterators running on write batch will become invalid. So we + won't be able to extract the current keys. + 3) Flush write batch accumulated so far. + 4) Refresh all the iterators to the exact keys where they were before + write batch flush. +*/ +bool ha_rocksdb::do_intrinsic_table_commit(Rdb_transaction *const tx) { + bool res = false; + if (m_tbl_def->get_table_type() == USER_TABLE) return res; + + if (tx->should_refresh_iterator_after_first_write) { + // This is to handle the special case where we start rocksdb iterator on + // new transaction(with empty write batch). Then iterator will only see + // the already committed data, but ignores any new data added in write + // batch later. So we are pro-actively refreshing the iterator after first + // write in write batch. + std::vector output; + get_ha_data(ha_thd())->extract_iterator_keys_for_all_handlers(&output); + res = get_ha_data(ha_thd())->refresh_iterator_for_all_handlers(&output); + tx->should_refresh_iterator_after_first_write = false; + return res; + } else if (tx->get_write_count(m_tbl_def->get_table_type()) < + rocksdb_max_intrinsic_tmp_table_write_count) { + return res; + } else { + std::vector output; + get_ha_data(ha_thd())->extract_iterator_keys_for_all_handlers(&output); + res = tx->flush_batch(m_tbl_def->get_table_type()); + if (res) { + // NO_LINT_DEBUG + sql_print_error("flush_batch failed for intrinsic table commit"); + return res; + } + res = get_ha_data(ha_thd())->refresh_iterator_for_all_handlers(&output); + inc_intrinsic_tmp_table_commits(); + tx->should_refresh_iterator_after_first_write = true; + return res; + } +} + +bool ha_rocksdb::refresh_tmp_table_iterator(const std::string &key) { + bool res = false; + if (m_tbl_def == nullptr || m_tbl_def->get_table_type() == USER_TABLE) { + return res; + } + // If m_iterator is valid, then after commit reset it back to previous value. + if (m_iterator != nullptr) { + if (!key.empty()) { + const rocksdb::Slice ¤t_key = rocksdb::Slice(key); + m_iterator->reset(); + rocksdb::Slice empty_end_slice; + if ((res = m_iterator->seek(HA_READ_KEY_OR_NEXT, current_key, + true /*using_full_key*/, empty_end_slice, + true /* read_current */))) { + return res; + } + if (m_iterator->key() != current_key) { + // Key not found in the index + res = HA_ERR_KEY_NOT_FOUND; + assert(false); + m_iterator->reset(); + return res; + } + } else { + m_iterator->reset(); + } + } + return res; +} + +void ha_rocksdb::extract_snapshot_keys(std::string *key) { + if (m_tbl_def != nullptr && m_iterator != nullptr && m_iterator->is_valid()) { + *key = m_iterator->key().ToString(); + } else { + *key = {}; + } } /* @@ -11123,12 +11290,12 @@ int ha_rocksdb::write_row(uchar *const buf) { */ ptrdiff_t ptrdiff = buf - table->record[0]; uchar *save_record_0 = nullptr; - if (ptrdiff) { + if (m_tbl_def->is_intrinsic_tmp_table() && ptrdiff) { save_record_0 = table->record[0]; table->record[0] = buf; for (uint i = 0; i < table->s->fields; i++) { - assert(table->s->field[i]); - table->s->field[i]->move_field_offset(ptrdiff); + assert(table->field[i]); + table->field[i]->move_field_offset(ptrdiff); } assert(m_tbl_def->is_intrinsic_tmp_table()); } @@ -11164,10 +11331,11 @@ int ha_rocksdb::write_row(uchar *const buf) { update_row_stats(ROWS_INSERTED); } - if (ptrdiff) { + if (m_tbl_def->is_intrinsic_tmp_table() && ptrdiff) { table->record[0] = save_record_0; for (uint i = 0; i < table->s->fields; i++) { - table->s->field[i]->move_field_offset(-ptrdiff); + assert(table->field[i]); + table->field[i]->move_field_offset(-ptrdiff); } } @@ -11485,7 +11653,7 @@ int ha_rocksdb::check_and_lock_sk( The bloom filter may need to be disabled for this lookup. */ assert(!m_key_descr_arr[key_id]->is_partial_index()); - Rdb_iterator_base iter(ha_thd(), m_key_descr_arr[key_id], m_pk_descr, + Rdb_iterator_base iter(ha_thd(), nullptr, m_key_descr_arr[key_id], m_pk_descr, m_tbl_def); /* @@ -12126,6 +12294,11 @@ int ha_rocksdb::update_write_row(const uchar *const old_data, DBUG_RETURN(HA_ERR_ROCKSDB_BULK_LOAD); } + if (m_tbl_def->get_table_type() != INTRINSIC_TMP && + do_intrinsic_table_commit(row_info.tx)) { + DBUG_RETURN(HA_ERR_ROCKSDB_TMP_TABLE_COMMIT_FAILED); + } + DBUG_RETURN(HA_EXIT_SUCCESS); } @@ -12275,7 +12448,7 @@ int ha_rocksdb::index_init(uint idx, bool sorted MY_ATTRIBUTE((__unused__))) { m_pk_descr, m_tbl_def, table, dd_table)); } else { m_iterator.reset(new Rdb_iterator_base( - thd, m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def)); + thd, this, m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def)); } // If m_lock_rows is not RDB_LOCK_NONE then we will be doing a get_for_update @@ -12297,7 +12470,7 @@ int ha_rocksdb::index_end() { m_need_build_decoder = false; - m_iterator = nullptr; + m_iterator.reset(nullptr); active_index = MAX_KEY; in_range_check_pushed_down = false; @@ -12436,6 +12609,11 @@ int ha_rocksdb::delete_row(const uchar *const buf) { if (do_bulk_commit(tx)) { DBUG_RETURN(HA_ERR_ROCKSDB_BULK_LOAD); } + + if (m_tbl_def->get_table_type() != INTRINSIC_TMP && + do_intrinsic_table_commit(tx)) { + DBUG_RETURN(HA_ERR_ROCKSDB_TMP_TABLE_COMMIT_FAILED); + } /* TODO(yzha) - row stats are gone in 8.0 stats.rows_deleted++; */ @@ -12860,23 +13038,7 @@ int ha_rocksdb::update_row(const uchar *const old_data, uchar *const new_data) { old_data points to record we're updating. It is the same as the record we've just read (for multi-table UPDATE, too, because SQL layer will make an rnd_pos() call to re-read the record before calling update_row()). - - Only intrinsic table(create_ondisk_from_heap) uses record[1] as buffer to - read/write. All scenarios uses record[0] as buffer to read/write. - Rdb_converter uses fields pointing to record[0] for encoding, so updating - the fields to record[1] for tmp tables is required for proper encoding. */ - ptrdiff_t ptrdiff = new_data - table->record[0]; - uchar *save_record_0 = nullptr; - if (ptrdiff) { - save_record_0 = table->record[0]; - table->record[0] = new_data; - for (uint i = 0; i < table->s->fields; i++) { - assert(table->s->field[i]); - table->s->field[i]->move_field_offset(ptrdiff); - } - assert(m_tbl_def->is_intrinsic_tmp_table()); - } ha_statistic_increment(&System_status_var::ha_update_count); int err = check_disk_usage(); @@ -12891,13 +13053,6 @@ int ha_rocksdb::update_row(const uchar *const old_data, uchar *const new_data) { update_row_stats(ROWS_UPDATED); } - if (ptrdiff) { - table->record[0] = save_record_0; - for (uint i = 0; i < table->s->fields; i++) { - table->s->field[i]->move_field_offset(-ptrdiff); - } - } - DBUG_RETURN(rv); } @@ -13590,10 +13745,9 @@ int ha_rocksdb::delete_table(const char *const tablename, HA_EXIT_SUCCESS OK other HA_ERR error code (cannot be SE-specific) */ -int ha_rocksdb::rename_table( - const char *const from, const char *const to, - const dd::Table *from_table_def MY_ATTRIBUTE((__unused__)), - dd::Table *to_table_def MY_ATTRIBUTE((__unused__))) { +int ha_rocksdb::rename_table(const char *const from, const char *const to, + [[maybe_unused]] const dd::Table *from_table_def, + [[maybe_unused]] dd::Table *to_table_def) { DBUG_ENTER_FUNC(); std::string from_str; @@ -15778,6 +15932,8 @@ static void myrocks_update_status() { export_stats.covered_secondary_key_lookups = global_stats.covered_secondary_key_lookups; + export_stats.intrinsic_tmp_table_commits = + global_stats.intrinsic_tmp_table_commits; } static void myrocks_update_memory_status() { @@ -15836,7 +15992,9 @@ static SHOW_VAR myrocks_status_variables[] = { DEF_STATUS_VAR_FUNC("covered_secondary_key_lookups", &export_stats.covered_secondary_key_lookups, SHOW_LONGLONG), - + DEF_STATUS_VAR_FUNC("intrinsic_tmp_table_commits", + &export_stats.intrinsic_tmp_table_commits, + SHOW_LONGLONG), {NullS, NullS, SHOW_LONG, SHOW_SCOPE_GLOBAL}}; static int show_myrocks_vars(THD *thd MY_ATTRIBUTE((unused)), SHOW_VAR *var, @@ -17817,7 +17975,7 @@ unsigned long long get_partial_index_sort_max_mem(THD *thd) { const rocksdb::ReadOptions &rdb_tx_acquire_snapshot(Rdb_transaction *tx) { tx->acquire_snapshot(true, TABLE_TYPE::USER_TABLE); - return tx->m_read_opts; + return tx->m_read_opts[TABLE_TYPE::USER_TABLE]; } rocksdb::Iterator *rdb_tx_get_iterator( diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index 9a50fa9fd105..c95a15fd92c5 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -370,6 +370,8 @@ class ha_rocksdb : public my_core::handler, public blob_buffer { bool skip_unique_check() const; bool do_bulk_commit(Rdb_transaction *const tx) MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); + bool do_intrinsic_table_commit(Rdb_transaction *const tx) + MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); bool has_hidden_pk(const TABLE *const table) const MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); @@ -397,6 +399,8 @@ class ha_rocksdb : public my_core::handler, public blob_buffer { Rdb_io_perf m_io_perf; public: + bool refresh_tmp_table_iterator(const std::string &key); + void extract_snapshot_keys(std::string *key); static rocksdb::Range get_range(const Rdb_key_def &kd, uchar buf[]); /* @@ -490,9 +494,9 @@ class ha_rocksdb : public my_core::handler, public blob_buffer { bool should_store_row_debug_checksums() const; int rename_table(const char *const from, const char *const to, - const dd::Table *from_table_def MY_ATTRIBUTE((__unused__)), - dd::Table *to_table_def MY_ATTRIBUTE((__unused__))) override - MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); + const dd::Table *from_table_def, + dd::Table *to_table_def) override + MY_ATTRIBUTE((__warn_unused_result__, __nonnull__(2, 3))); int convert_record_from_storage_format(const rocksdb::Slice *const key, const rocksdb::Slice *const value, @@ -984,6 +988,7 @@ class ha_rocksdb : public my_core::handler, public blob_buffer { void update_row_read(ulonglong count); static void inc_covered_sk_lookup(); + static void inc_intrinsic_tmp_table_commits(); void build_decoder(); void check_build_decoder(); @@ -1152,6 +1157,8 @@ bool should_log_rejected_bypass_rpc(); unsigned long long get_partial_index_sort_max_mem(THD *thd); Rdb_transaction *get_tx_from_thd(THD *const thd); +void add_tmp_table_handler(THD *const thd, ha_rocksdb *rocksdb_handler); +void remove_tmp_table_handler(THD *const thd, ha_rocksdb *rocksdb_handler); const rocksdb::ReadOptions &rdb_tx_acquire_snapshot(Rdb_transaction *tx); diff --git a/storage/rocksdb/nosql_access.cc b/storage/rocksdb/nosql_access.cc index 6b6660edd506..8d3676b32f3f 100644 --- a/storage/rocksdb/nosql_access.cc +++ b/storage/rocksdb/nosql_access.cc @@ -2331,7 +2331,7 @@ bool INLINE_ATTR select_exec::setup_iterator(THD *thd) { m_tbl_def, m_table, m_dd_table)); } else { m_iterator.reset( - new Rdb_iterator_base(thd, m_key_def, m_pk_def, m_tbl_def)); + new Rdb_iterator_base(thd, nullptr, m_key_def, m_pk_def, m_tbl_def)); } return m_iterator == nullptr; diff --git a/storage/rocksdb/rdb_global.h b/storage/rocksdb/rdb_global.h index 8380387cc63b..6ecd53d1bdaf 100644 --- a/storage/rocksdb/rdb_global.h +++ b/storage/rocksdb/rdb_global.h @@ -353,7 +353,8 @@ static_assert(HA_ERR_ROCKSDB_FIRST > HA_ERR_LAST, #define HA_ERR_ROCKSDB_STATUS_DEADLOCK (HA_ERR_ROCKSDB_FIRST + 23) #define HA_ERR_ROCKSDB_STATUS_EXPIRED (HA_ERR_ROCKSDB_FIRST + 24) #define HA_ERR_ROCKSDB_STATUS_TRY_AGAIN (HA_ERR_ROCKSDB_FIRST + 25) -#define HA_ERR_ROCKSDB_LAST HA_ERR_ROCKSDB_STATUS_TRY_AGAIN +#define HA_ERR_ROCKSDB_TMP_TABLE_COMMIT_FAILED (HA_ERR_ROCKSDB_FIRST + 26) +#define HA_ERR_ROCKSDB_LAST HA_ERR_ROCKSDB_TMP_TABLE_COMMIT_FAILED const char *const rocksdb_hton_name = "ROCKSDB"; @@ -424,6 +425,7 @@ struct st_global_stats { table_index_stats_result[TABLE_INDEX_STATS_RESULT_MAX]; ib_counter_t covered_secondary_key_lookups; + ib_counter_t intrinsic_tmp_table_commits; }; /* Struct used for exporting status to MySQL */ @@ -450,6 +452,7 @@ struct st_export_stats { ulonglong table_index_stats_req_queue_length; ulonglong covered_secondary_key_lookups; + ulonglong intrinsic_tmp_table_commits; }; /* Struct used for exporting RocksDB memory status */ diff --git a/storage/rocksdb/rdb_iterator.cc b/storage/rocksdb/rdb_iterator.cc index 0a21a8a1541e..63f715919b28 100644 --- a/storage/rocksdb/rdb_iterator.cc +++ b/storage/rocksdb/rdb_iterator.cc @@ -27,7 +27,7 @@ namespace myrocks { Rdb_iterator::~Rdb_iterator() {} -Rdb_iterator_base::Rdb_iterator_base(THD *thd, +Rdb_iterator_base::Rdb_iterator_base(THD *thd, ha_rocksdb *rocksdb_handler, const std::shared_ptr kd, const std::shared_ptr pkd, const Rdb_tbl_def *tbl_def) @@ -35,13 +35,20 @@ Rdb_iterator_base::Rdb_iterator_base(THD *thd, m_pkd(pkd), m_tbl_def(tbl_def), m_thd(thd), + m_rocksdb_handler(rocksdb_handler), m_scan_it(nullptr), m_scan_it_skips_bloom(false), m_scan_it_snapshot(nullptr), m_scan_it_lower_bound(nullptr), m_scan_it_upper_bound(nullptr), m_prefix_buf(nullptr), - m_table_type(tbl_def->get_table_type()) {} + m_table_type(tbl_def->get_table_type()) { + if (tbl_def->get_table_type() == INTRINSIC_TMP) { + if (m_rocksdb_handler) { + add_tmp_table_handler(m_thd, m_rocksdb_handler); + } + } +} Rdb_iterator_base::~Rdb_iterator_base() { release_scan_iterator(); @@ -51,6 +58,11 @@ Rdb_iterator_base::~Rdb_iterator_base() { m_scan_it_upper_bound = nullptr; my_free(m_prefix_buf); m_prefix_buf = nullptr; + if (m_table_type == INTRINSIC_TMP) { + if (m_rocksdb_handler) { + remove_tmp_table_handler(m_thd, m_rocksdb_handler); + } + } } int Rdb_iterator_base::read_before_key(const bool full_key_match, @@ -366,9 +378,9 @@ Rdb_iterator_partial::Rdb_iterator_partial( THD *thd, const std::shared_ptr kd, const std::shared_ptr pkd, const Rdb_tbl_def *tbl_def, TABLE *table, const dd::Table *dd_table) - : Rdb_iterator_base(thd, kd, pkd, tbl_def), + : Rdb_iterator_base(thd, nullptr, kd, pkd, tbl_def), m_table(table), - m_iterator_pk(thd, pkd, pkd, tbl_def), + m_iterator_pk(thd, nullptr, pkd, pkd, tbl_def), m_converter(thd, tbl_def, table, dd_table), m_valid(false), m_materialized(false), @@ -625,7 +637,7 @@ int Rdb_iterator_partial::materialize_prefix() { // It is possible that someone else has already materialized this group // before we locked. Double check if the prefix is still empty. - Rdb_iterator_base iter(m_thd, m_kd, m_pkd, m_tbl_def); + Rdb_iterator_base iter(m_thd, nullptr, m_kd, m_pkd, m_tbl_def); m_kd->get_infimum_key(m_cur_prefix_key, &tmp); int rc = iter.seek(HA_READ_KEY_EXACT, cur_prefix_key, false, cur_prefix_key, true /* read current */); @@ -651,7 +663,7 @@ int Rdb_iterator_partial::materialize_prefix() { } m_pkd->get_infimum_key(m_cur_prefix_key, &tmp); - Rdb_iterator_base iter_pk(m_thd, m_pkd, m_pkd, m_tbl_def); + Rdb_iterator_base iter_pk(m_thd, nullptr, m_pkd, m_pkd, m_tbl_def); rc = iter_pk.seek(HA_READ_KEY_EXACT, cur_prefix_key, false, cur_prefix_key, true /* read current */); size_t num_rows = 0; diff --git a/storage/rocksdb/rdb_iterator.h b/storage/rocksdb/rdb_iterator.h index 7e58d6adcd92..b564a12a16a9 100644 --- a/storage/rocksdb/rdb_iterator.h +++ b/storage/rocksdb/rdb_iterator.h @@ -69,6 +69,7 @@ class Rdb_iterator { virtual rocksdb::Slice key() = 0; virtual rocksdb::Slice value() = 0; virtual void reset() = 0; + virtual bool is_valid() = 0; }; class Rdb_iterator_base : public Rdb_iterator { @@ -86,7 +87,8 @@ class Rdb_iterator_base : public Rdb_iterator { int next_with_direction(bool move_forward, bool skip_next); public: - Rdb_iterator_base(THD *thd, const std::shared_ptr kd, + Rdb_iterator_base(THD *thd, ha_rocksdb *rocksdb_handler, + const std::shared_ptr kd, const std::shared_ptr pkd, const Rdb_tbl_def *tbl_def); @@ -109,6 +111,8 @@ class Rdb_iterator_base : public Rdb_iterator { void reset() override { release_scan_iterator(); } + bool is_valid() override { return is_valid_iterator(m_scan_it); } + protected: friend class Rdb_iterator; const std::shared_ptr m_kd; @@ -120,6 +124,8 @@ class Rdb_iterator_base : public Rdb_iterator { THD *m_thd; + ha_rocksdb *m_rocksdb_handler; + /* Iterator used for range scans and for full table/index scans */ rocksdb::Iterator *m_scan_it; @@ -222,6 +228,7 @@ class Rdb_iterator_partial : public Rdb_iterator_base { rocksdb::Slice key() override; rocksdb::Slice value() override; void reset() override; + bool is_valid() override { return false; } }; } // namespace myrocks