From f297e40281fba9e3b59acef0ccb364a4be21d5ad Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Marko=20M=C3=A4kel=C3=A4?= <marko.makela@mariadb.com>
Date: Wed, 14 Mar 2018 11:11:01 +0200
Subject: [PATCH] Clean up SPATIAL INDEX lock creation

lock_rec_create(): Take the block as a parameter, like the function
used to be before SPATIAL INDEX support was introduced
for InnoDB tables in MySQL 5.7.

FIXME: This function is still invoked for spatial indexes, by
lock_rec_enqueue_waiting() and sel_set_rtr_rec_lock().

lock_prdt_create(): Create an R-tree page or predicate lock.
---
 storage/innobase/include/lock0lock.h  | 60 ++-------------------
 storage/innobase/include/lock0lock.ic | 35 -------------
 storage/innobase/lock/lock0lock.cc    | 75 +++++++++++++++------------
 storage/innobase/lock/lock0prdt.cc    | 98 +++++++++++++++++++++++++++--------
 4 files changed, 122 insertions(+), 146 deletions(-)

diff --git a/storage/innobase/include/lock0lock.h b/storage/innobase/include/lock0lock.h
index c9500f7fd49..468fe4338aa 100644
--- a/storage/innobase/include/lock0lock.h
+++ b/storage/innobase/include/lock0lock.h
@@ -954,30 +954,6 @@ struct lock_sys_t{
 						is running */
 };
 
-/*********************************************************************//**
-Creates a new record lock and inserts it to the lock queue. Does NOT check
-for deadlocks or lock compatibility!
-@return created lock */
-UNIV_INLINE
-lock_t*
-lock_rec_create(
-/*============*/
-#ifdef WITH_WSREP
-	lock_t*			c_lock,	/*!< conflicting lock */
-	que_thr_t*		thr,	/*!< thread owning trx */
-#endif
-	ulint			type_mode,/*!< in: lock mode and wait
-					flag, type is ignored and
-					replaced by LOCK_REC */
-	const buf_block_t*	block,	/*!< in: buffer block containing
-					the record */
-	ulint			heap_no,/*!< in: heap number of the record */
-	dict_index_t*		index,	/*!< in: index of record */
-	trx_t*			trx,	/*!< in,out: transaction */
-	bool			caller_owns_trx_mutex);
-					/*!< in: true if caller owns
-					trx mutex */
-
 /*************************************************************//**
 Removes a record lock request, waiting or granted, from the queue. */
 void
@@ -987,32 +963,6 @@ lock_rec_discard(
 					record locks which are contained
 					in this lock object are removed */
 
-/** Create a new record lock and inserts it to the lock queue,
-without checking for deadlocks or conflicts.
-@param[in]	type_mode	lock mode and wait flag; type will be replaced
-				with LOCK_REC
-@param[in]	space		tablespace id
-@param[in]	page_no		index page number
-@param[in]	page		R-tree index page, or NULL
-@param[in]	heap_no		record heap number in the index page
-@param[in]	index		the index tree
-@param[in,out]	trx		transaction
-@param[in]	holds_trx_mutex	whether the caller holds trx->mutex
-@return created lock */
-lock_t*
-lock_rec_create_low(
-#ifdef WITH_WSREP
-	lock_t*		c_lock,	/*!< conflicting lock */
-	que_thr_t*	thr,	/*!< thread owning trx */
-#endif
-	ulint		type_mode,
-	ulint		space,
-	ulint		page_no,
-	const page_t*	page,
-	ulint		heap_no,
-	dict_index_t*	index,
-	trx_t*		trx,
-	bool		holds_trx_mutex);
 /** Enqueue a waiting request for a lock which cannot be granted immediately.
 Check for deadlocks.
 @param[in]	type_mode	the requested lock mode (LOCK_S or LOCK_X)
@@ -1022,11 +972,11 @@ Check for deadlocks.
 				waiting lock request is set
 				when performing an insert of
 				an index record
-@param[in]	block		leaf page in the index
-@param[in]	heap_no		record heap number in the block
-@param[in]	index		index tree
+@param[in]	block		B-tree or R-tree index leaf page
+@param[in]	heap_no		record heap number in the block (B-tree)
+@param[in]	index		B-tree or R-tree index
 @param[in,out]	thr		query thread
-@param[in]	prdt		minimum bounding box (spatial index)
+@param[in]	prdt		minimum bounding box (R-tree index)
 @retval	DB_LOCK_WAIT		if the waiting lock was enqueued
 @retval	DB_DEADLOCK		if this transaction was chosen as the victim
 @retval	DB_SUCCESS_LOCKED_REC	if the other transaction was chosen as a victim
@@ -1041,7 +991,7 @@ lock_rec_enqueue_waiting(
 	ulint			heap_no,
 	dict_index_t*		index,
 	que_thr_t*		thr,
-	lock_prdt_t*		prdt);
+	const lock_prdt_t*	prdt = NULL);
 /*************************************************************//**
 Moves the explicit locks on user records to another page if a record
 list start is moved to another page. */
diff --git a/storage/innobase/include/lock0lock.ic b/storage/innobase/include/lock0lock.ic
index 475f2ccedf1..87a7445cd36 100644
--- a/storage/innobase/include/lock0lock.ic
+++ b/storage/innobase/include/lock0lock.ic
@@ -97,38 +97,3 @@ lock_hash_get(
 		return(lock_sys->rec_hash);
 	}
 }
-
-/*********************************************************************//**
-Creates a new record lock and inserts it to the lock queue. Does NOT check
-for deadlocks or lock compatibility!
-@return created lock */
-UNIV_INLINE
-lock_t*
-lock_rec_create(
-/*============*/
-#ifdef WITH_WSREP
-	lock_t*			c_lock,	/*!< conflicting lock */
-	que_thr_t*		thr,	/*!< thread owning trx */
-#endif
-	ulint			type_mode,/*!< in: lock mode and wait
-					flag, type is ignored and
-					replaced by LOCK_REC */
-	const buf_block_t*	block,	/*!< in: buffer block containing
-					the record */
-	ulint			heap_no,/*!< in: heap number of the record */
-	dict_index_t*		index,	/*!< in: index of record */
-	trx_t*			trx,	/*!< in,out: transaction */
-	bool			caller_owns_trx_mutex)
-					/*!< in: TRUE if caller owns
-					trx mutex */
-{
-	btr_assert_not_corrupted(block, index);
-	return lock_rec_create_low(
-#ifdef WITH_WSREP
-		c_lock, thr,
-#endif
-		type_mode,
-		block->page.id.space(), block->page.id.page_no(),
-		block->frame, heap_no,
-		index, trx, caller_owns_trx_mutex);
-}
diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc
index 5b79d2c23d0..bb96b11aabe 100644
--- a/storage/innobase/lock/lock0lock.cc
+++ b/storage/innobase/lock/lock0lock.cc
@@ -62,8 +62,10 @@ my_bool	innobase_deadlock_detect;
 /** Total number of cached record locks */
 static const ulint	REC_LOCK_CACHE = 8;
 
+/** Maximum record lock heap size in bytes */
+static const ulint	REC_LOCK_HEAP_SIZE = 256;
 /** Maximum record lock size in bytes */
-static const ulint	REC_LOCK_SIZE = sizeof(ib_lock_t) + 256;
+static const ulint	REC_LOCK_SIZE = sizeof(ib_lock_t) + REC_LOCK_HEAP_SIZE;
 
 /** Total number of cached table locks */
 static const ulint	TABLE_LOCK_CACHE = 8;
@@ -1394,32 +1396,29 @@ wsrep_print_wait_locks(
 }
 #endif /* WITH_WSREP */
 
-/** Create a new record lock and inserts it to the lock queue,
+/** Create and register a B-tree record lock or R-tree predicate lock,
 without checking for deadlocks or conflicts.
 @param[in]	type_mode	lock mode and wait flag; type will be replaced
 				with LOCK_REC
-@param[in]	space		tablespace id
-@param[in]	page_no		index page number
-@param[in]	page		R-tree index page, or NULL
+@param[in]	block		B-tree or R-tree index leaf page
 @param[in]	heap_no		record heap number in the index page
-@param[in]	index		the index tree
+@param[in]	index		B-tree or R-tree index
 @param[in,out]	trx		transaction
 @param[in]	holds_trx_mutex	whether the caller holds trx->mutex
 @return created lock */
+static
 lock_t*
-lock_rec_create_low(
+lock_rec_create(
 #ifdef WITH_WSREP
-	lock_t*		c_lock,	/*!< conflicting lock */
-	que_thr_t*	thr,	/*!< thread owning trx */
+	lock_t*			c_lock,	/*!< conflicting lock */
+	que_thr_t*		thr,	/*!< thread owning trx */
 #endif
-	ulint		type_mode,
-	ulint		space,
-	ulint		page_no,
-	const page_t*	page,
-	ulint		heap_no,
-	dict_index_t*	index,
-	trx_t*		trx,
-	bool		holds_trx_mutex)
+	ulint			type_mode,
+	const buf_block_t*	block,
+	ulint			heap_no,
+	dict_index_t*		index,
+	trx_t*			trx,
+	bool			holds_trx_mutex)
 {
 	lock_t*		lock;
 	ulint		n_bits;
@@ -1428,6 +1427,13 @@ lock_rec_create_low(
 	ut_ad(lock_mutex_own());
 	ut_ad(holds_trx_mutex == trx_mutex_own(trx));
 	ut_ad(dict_index_is_clust(index) || !dict_index_is_online_ddl(index));
+#if 0 /* FIXME: sel_set_rtr_rec_lock() is setting a record lock! */
+	ut_ad(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE))
+	      == !dict_index_is_spatial(index));
+#else
+	ut_ad(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE))
+	      || dict_index_is_spatial(index));
+#endif
 
 #ifdef UNIV_DEBUG
 	/* Non-locking autocommit read-only transactions should not set
@@ -1449,7 +1455,8 @@ lock_rec_create_low(
 
 	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
 		/* Make lock bitmap bigger by a safety margin */
-		n_bits = page_dir_get_n_heap(page) + LOCK_PAGE_BITMAP_MARGIN;
+		n_bits = page_dir_get_n_heap(block->frame)
+			+ LOCK_PAGE_BITMAP_MARGIN;
 		n_bytes = 1 + n_bits / 8;
 	} else {
 		ut_ad(heap_no == PRDT_HEAPNO);
@@ -1472,7 +1479,7 @@ lock_rec_create_low(
 	}
 
  	if (trx->lock.rec_cached >= trx->lock.rec_pool.size()
-	    || sizeof *lock + n_bytes > REC_LOCK_SIZE) {
+	    || n_bytes > REC_LOCK_HEAP_SIZE) {
 		lock = static_cast<lock_t*>(
 			mem_heap_alloc(trx->lock.lock_heap,
 				       sizeof *lock + n_bytes));
@@ -1483,15 +1490,15 @@ lock_rec_create_low(
 	lock->trx = trx;
 	lock->type_mode = (type_mode & ~LOCK_TYPE_MASK) | LOCK_REC;
 	lock->index = index;
-	lock->un_member.rec_lock.space = uint32_t(space);
-	lock->un_member.rec_lock.page_no = uint32_t(page_no);
+	lock->un_member.rec_lock.space = block->page.id.space();
+	lock->un_member.rec_lock.page_no = block->page.id.page_no();
 
 	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))) {
 		lock->un_member.rec_lock.n_bits = uint32_t(n_bytes * 8);
 	} else {
 		/* Predicate lock always on INFIMUM (0) */
 		lock->un_member.rec_lock.n_bits = 8;
- 	}
+	}
 	lock_rec_bitmap_reset(lock);
 	lock_rec_set_nth_bit(lock, heap_no);
 	index->table->n_rec_locks++;
@@ -1569,10 +1576,10 @@ lock_rec_create_low(
 	    == INNODB_LOCK_SCHEDULE_ALGORITHM_VATS
 	    && !thd_is_replication_slave_thread(trx->mysql_thd)) {
 		HASH_PREPEND(lock_t, hash, lock_sys->rec_hash,
-			     lock_rec_fold(space, page_no), lock);
+			     lock_rec_lock_fold(lock), lock);
 	} else {
 		HASH_INSERT(lock_t, hash, lock_hash_get(type_mode),
-			    lock_rec_fold(space, page_no), lock);
+			    lock_rec_lock_fold(lock), lock);
 	}
 
 	if (!holds_trx_mutex) {
@@ -1750,11 +1757,11 @@ Check for deadlocks.
 				waiting lock request is set
 				when performing an insert of
 				an index record
-@param[in]	block		leaf page in the index
-@param[in]	heap_no		record heap number in the block
-@param[in]	index		index tree
+@param[in]	block		B-tree or R-tree index leaf page
+@param[in]	heap_no		record heap number in the block (B-tree)
+@param[in]	index		B-tree or R-tree index
 @param[in,out]	thr		query thread
-@param[in]	prdt		minimum bounding box (spatial index)
+@param[in]	prdt		minimum bounding box (R-tree index)
 @retval	DB_LOCK_WAIT		if the waiting lock was enqueued
 @retval	DB_DEADLOCK		if this transaction was chosen as the victim
 @retval	DB_SUCCESS_LOCKED_REC	if the other transaction was chosen as a victim
@@ -1769,7 +1776,7 @@ lock_rec_enqueue_waiting(
 	ulint			heap_no,
 	dict_index_t*		index,
 	que_thr_t*		thr,
-	lock_prdt_t*		prdt)
+	const lock_prdt_t*	prdt)
 {
 	ut_ad(lock_mutex_own());
 	ut_ad(!srv_read_only_mode);
@@ -1802,7 +1809,7 @@ lock_rec_enqueue_waiting(
 #endif
 		type_mode | LOCK_WAIT, block, heap_no, index, trx, TRUE);
 
-	if (prdt && type_mode & LOCK_PREDICATE) {
+	if (UNIV_LIKELY_NULL(prdt) && type_mode & LOCK_PREDICATE) {
 		lock_prdt_set_prdt(lock, prdt);
 	}
 
@@ -1841,9 +1848,9 @@ lock_rec_enqueue_waiting(
 
 	MONITOR_INC(MONITOR_LOCKREC_WAIT);
 
-	if (innodb_lock_schedule_algorithm
+	if (UNIV_LIKELY(!(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))
+	    && innodb_lock_schedule_algorithm
 	    == INNODB_LOCK_SCHEDULE_ALGORITHM_VATS
-	    && !prdt
 	    && !thd_is_replication_slave_thread(lock->trx->mysql_thd)) {
 		HASH_DELETE(lock_t, hash, lock_sys->rec_hash,
 			    lock_rec_lock_fold(lock), lock);
@@ -2129,7 +2136,7 @@ lock_rec_lock_slow(
 #ifdef WITH_WSREP
 			c_lock,
 #endif /* WITH_WSREP */
-			mode, block, heap_no, index, thr, NULL);
+			mode, block, heap_no, index, thr);
 	} else if (!impl) {
 		/* Set the requested lock on the record, note that
 		we already own the transaction mutex. */
@@ -5988,7 +5995,7 @@ lock_rec_insert_check_and_lock(
 #ifdef WITH_WSREP
 			c_lock,
 #endif /* WITH_WSREP */
-			type_mode, block, heap_no, index, thr, NULL);
+			type_mode, block, heap_no, index, thr);
 
 		trx_mutex_exit(trx);
 	} else {
diff --git a/storage/innobase/lock/lock0prdt.cc b/storage/innobase/lock/lock0prdt.cc
index 5843508741d..dfb48eafd9f 100644
--- a/storage/innobase/lock/lock0prdt.cc
+++ b/storage/innobase/lock/lock0prdt.cc
@@ -425,6 +425,69 @@ lock_prdt_find_on_page(
 	return(NULL);
 }
 
+/** Create and register an R-tree page or predicate lock,
+without checking for deadlocks or conflicts.
+@param[in]	space		tablespace id
+@param[in]	page_no		R-tree index page number
+@param[in]	index		R-tree index
+@param[in,out]	trx		transaction
+@param[in]	holds_trx_mutex	whether the trx->mutex is held
+@return created lock */
+static
+lock_t*
+lock_prdt_create(
+	ulint		type_mode,
+	ulint		space,
+	ulint		page_no,
+	dict_index_t*	index,
+	trx_t*		trx,
+	bool		holds_trx_mutex = false)
+{
+	ut_ad(lock_mutex_own());
+	ut_ad(trx_mutex_own(trx) == holds_trx_mutex);
+	ut_ad(dict_index_is_spatial(index));
+	ut_ad(type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE));
+	ut_ad(type_mode & LOCK_REC);
+	ut_ad((type_mode & LOCK_MODE_MASK) == LOCK_S);
+
+	lock_t* lock = trx->lock.rec_cached >= trx->lock.rec_pool.size()
+		? static_cast<lock_t*>(mem_heap_alloc(trx->lock.lock_heap,
+						      1 + sizeof *lock))
+		: trx->lock.rec_pool[trx->lock.rec_cached++];
+
+	lock->trx = trx;
+	lock->type_mode = type_mode;
+	lock->index = index;
+	lock->un_member.rec_lock.space = uint32_t(space);
+	lock->un_member.rec_lock.page_no = uint32_t(page_no);
+
+	/* Predicate lock always on INFIMUM (0) */
+	lock->un_member.rec_lock.n_bits = 8;
+	*reinterpret_cast<byte*>(&lock[1]) = 1U << PRDT_HEAPNO;
+	index->table->n_rec_locks++;
+	ut_ad(index->table->n_ref_count > 0 || !index->table->can_be_evicted);
+
+	HASH_INSERT(lock_t, hash, lock_hash_get(type_mode),
+		    lock_rec_fold(space, page_no), lock);
+
+	if (!holds_trx_mutex) {
+		trx_mutex_enter(trx);
+	}
+	ut_ad(trx_mutex_own(trx));
+	if (type_mode & LOCK_WAIT) {
+		lock_set_lock_and_trx_wait(lock, trx);
+	}
+	UT_LIST_ADD_LAST(trx->lock.trx_locks, lock);
+	if (!holds_trx_mutex) {
+		trx_mutex_exit(trx);
+	}
+
+	MONITOR_INC(MONITOR_RECLOCK_CREATED);
+	MONITOR_INC(MONITOR_NUM_RECLOCK);
+
+	return lock;
+}
+
 /*********************************************************************//**
 Adds a predicate lock request in the predicate lock queue.
 @return	lock where the bit was set */
@@ -496,11 +559,9 @@ lock_prdt_add_to_queue(
 		}
 	}
 
-	lock = lock_rec_create(
-#ifdef WITH_WSREP
-		NULL, NULL, /* FIXME: replicate SPATIAL INDEX locks */
-#endif
-		type_mode, block, PRDT_HEAPNO, index, trx,
+	lock = lock_prdt_create(
+		type_mode, block->page.id.space(),
+		block->page.id.page_no(), index, trx,
 		caller_owns_trx_mutex);
 
 	if (lock->type_mode & LOCK_PREDICATE) {
@@ -838,16 +899,14 @@ lock_prdt_lock(
 
 	lock_mutex_enter();
 
-	const ulint	prdt_mode = mode | type_mode;
+	const ulint	prdt_mode = mode | type_mode | LOCK_REC;
 	lock_t*		lock = lock_rec_get_first_on_page(hash, block);
 
 	if (lock == NULL) {
-		lock = lock_rec_create(
-#ifdef WITH_WSREP
-			NULL, NULL, /* FIXME: replicate SPATIAL INDEX locks */
-#endif
-			mode | type_mode, block, PRDT_HEAPNO,
-			index, trx, FALSE);
+		lock = lock_prdt_create(
+			prdt_mode,
+			block->page.id.space(), block->page.id.page_no(),
+			index, trx);
 
 		status = LOCK_REC_SUCCESS_CREATED;
 	} else {
@@ -878,7 +937,7 @@ lock_prdt_lock(
 						NULL, /* FIXME: replicate
 						      SPATIAL INDEX locks */
 #endif
-						mode | type_mode,
+						prdt_mode,
 						block, PRDT_HEAPNO,
 						index, thr, prdt);
 				} else {
@@ -939,8 +998,6 @@ lock_place_prdt_page_lock(
 
 	const lock_t*	lock = lock_rec_get_first_on_page_addr(
 		lock_sys->prdt_page_hash, space, page_no);
-
-	const ulint	mode = LOCK_S | LOCK_PRDT_PAGE;
 	trx_t*		trx = thr_get_trx(thr);
 
 	if (lock != NULL) {
@@ -954,19 +1011,16 @@ lock_place_prdt_page_lock(
 			lock = lock_rec_get_next_on_page_const(lock);
 		}
 
-		ut_ad(lock == NULL || lock->type_mode == (mode | LOCK_REC));
+		ut_ad(lock == NULL || lock->type_mode
+		      == (LOCK_S | LOCK_PRDT_PAGE | LOCK_REC));
 		ut_ad(lock == NULL || lock_rec_get_n_bits(lock) != 0);
 
 		trx_mutex_exit(trx);
 	}
 
 	if (lock == NULL) {
-		lock = lock_rec_create_low(
-#ifdef WITH_WSREP
-			NULL, NULL, /* FIXME: replicate SPATIAL INDEX locks */
-#endif
-			mode, space, page_no, NULL, PRDT_HEAPNO,
-			index, trx, FALSE);
+		lock = lock_prdt_create(LOCK_S | LOCK_PRDT_PAGE | LOCK_REC,
+					space, page_no, index, trx);
 
 #ifdef PRDT_DIAG
 		printf("GIS_DIAGNOSTIC: page lock %d\n", (int) page_no);
-- 
2.16.2

