commit 71b53e744314528b6785121af3016a3af0edcb16
Author: Thirunarayanan Balathandayuthapani <thiru@mariadb.com>
Date:   Thu Apr 20 22:01:37 2023 +0530

    MDEV-30996      INSERT..SELECT in presence of fulltext index
                      freezes all other commits at commit time
    
    - Introduced new variable innodb_fts_threads which processing
    fulltext message, optimization of fulltext table.
    Minimum value is 1, default value is 2 and maximum value is 255
    
    - By having multiple fts threads, InnoDB can do sync of multiple
    table at the same time.
    
    - Introduce the class fts_slots_t, which can be used to store
    the fts table in the slot.

diff --git a/mysql-test/suite/sys_vars/r/innodb_fts_threads.result b/mysql-test/suite/sys_vars/r/innodb_fts_threads.result
new file mode 100644
index 00000000000..c491b0fbf9c
--- /dev/null
+++ b/mysql-test/suite/sys_vars/r/innodb_fts_threads.result
@@ -0,0 +1,43 @@
+SET @start_global_value = @@global.innodb_fts_threads;
+select @@global.innodb_fts_threads;
+@@global.innodb_fts_threads
+2
+select @@session.innodb_fts_threads;
+ERROR HY000: Variable 'innodb_fts_threads' is a GLOBAL variable
+show global variables like 'innodb_fts_threads';
+Variable_name	Value
+innodb_fts_threads	2
+show session variables like 'innodb_fts_threads';
+Variable_name	Value
+innodb_fts_threads	2
+select * from information_schema.global_variables
+where variable_name='innodb_fts_threads';
+VARIABLE_NAME	VARIABLE_VALUE
+INNODB_FTS_THREADS	2
+select * from information_schema.session_variables
+where variable_name='innodb_fts_threads';
+VARIABLE_NAME	VARIABLE_VALUE
+INNODB_FTS_THREADS	2
+set global innodb_fts_threads=5;
+select @@global.innodb_fts_threads;
+@@global.innodb_fts_threads
+5
+set global innodb_fts_threads=2;
+select @@global.innodb_fts_threads;
+@@global.innodb_fts_threads
+2
+set session innodb_fts_threads=1;
+ERROR HY000: Variable 'innodb_fts_threads' is a GLOBAL variable and should be set with SET GLOBAL
+set global innodb_fts_threads=1.1;
+ERROR 42000: Incorrect argument type to variable 'innodb_fts_threads'
+set global innodb_fts_threads=1e1;
+ERROR 42000: Incorrect argument type to variable 'innodb_fts_threads'
+set global innodb_fts_threads="foo";
+ERROR 42000: Incorrect argument type to variable 'innodb_fts_threads'
+set global innodb_fts_threads=0;
+Warnings:
+Warning	1292	Truncated incorrect innodb_fts_threads value: '0'
+select @@global.innodb_fts_threads;
+@@global.innodb_fts_threads
+1
+SET @@global.innodb_fts_threads = @start_global_value;
diff --git a/mysql-test/suite/sys_vars/r/sysvars_innodb.result b/mysql-test/suite/sys_vars/r/sysvars_innodb.result
index e07725abbeb..c0e0ac70d47 100644
--- a/mysql-test/suite/sys_vars/r/sysvars_innodb.result
+++ b/mysql-test/suite/sys_vars/r/sysvars_innodb.result
@@ -763,6 +763,18 @@ NUMERIC_BLOCK_SIZE	0
 ENUM_VALUE_LIST	NULL
 READ_ONLY	YES
 COMMAND_LINE_ARGUMENT	REQUIRED
+VARIABLE_NAME	INNODB_FTS_THREADS
+SESSION_VALUE	NULL
+DEFAULT_VALUE	2
+VARIABLE_SCOPE	GLOBAL
+VARIABLE_TYPE	INT UNSIGNED
+VARIABLE_COMMENT	Number of threads performing background fts operation 
+NUMERIC_MIN_VALUE	1
+NUMERIC_MAX_VALUE	255
+NUMERIC_BLOCK_SIZE	0
+ENUM_VALUE_LIST	NULL
+READ_ONLY	NO
+COMMAND_LINE_ARGUMENT	REQUIRED
 VARIABLE_NAME	INNODB_FT_AUX_TABLE
 SESSION_VALUE	NULL
 DEFAULT_VALUE	
diff --git a/mysql-test/suite/sys_vars/t/innodb_fts_threads.test b/mysql-test/suite/sys_vars/t/innodb_fts_threads.test
new file mode 100644
index 00000000000..2d9c9fd8636
--- /dev/null
+++ b/mysql-test/suite/sys_vars/t/innodb_fts_threads.test
@@ -0,0 +1,46 @@
+# Variable name: innodb_fts_threads
+# Scope: Global
+# Access type: Dynamic
+# Data type: numeric
+
+--source include/have_innodb.inc
+
+SET @start_global_value = @@global.innodb_fts_threads;
+
+#
+# exists as global only
+#
+select @@global.innodb_fts_threads;
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+select @@session.innodb_fts_threads;
+show global variables like 'innodb_fts_threads';
+show session variables like 'innodb_fts_threads';
+
+select * from information_schema.global_variables
+where variable_name='innodb_fts_threads';
+select * from information_schema.session_variables
+where variable_name='innodb_fts_threads';
+
+#
+# show that it's writable
+#
+set global innodb_fts_threads=5;
+select @@global.innodb_fts_threads;
+set global innodb_fts_threads=2;
+select @@global.innodb_fts_threads;
+--error ER_GLOBAL_VARIABLE
+set session innodb_fts_threads=1;
+
+#
+# incorrect types
+#
+--error ER_WRONG_TYPE_FOR_VAR
+set global innodb_fts_threads=1.1;
+--error ER_WRONG_TYPE_FOR_VAR
+set global innodb_fts_threads=1e1;
+--error ER_WRONG_TYPE_FOR_VAR
+set global innodb_fts_threads="foo";
+set global innodb_fts_threads=0;
+select @@global.innodb_fts_threads;
+
+SET @@global.innodb_fts_threads = @start_global_value;
diff --git a/storage/innobase/fts/fts0fts.cc b/storage/innobase/fts/fts0fts.cc
index c54735cb819..685a567c499 100644
--- a/storage/innobase/fts/fts0fts.cc
+++ b/storage/innobase/fts/fts0fts.cc
@@ -4247,6 +4247,25 @@ static dberr_t fts_sync(fts_sync_t *sync, bool unlock_cache, bool wait)
 	sync->unlock_cache = unlock_cache;
 	sync->in_progress = true;
 
+	if (cache->total_size == 0)
+	{
+func_exit:
+          sync->in_progress = false;
+          pthread_cond_broadcast(&sync->cond);
+          mysql_mutex_unlock(&cache->lock);
+          /* We need to check whether an optimize is required, for
+          that we make copies of the two variables that control
+          the trigger. These variables can change behind our
+          back and we don't want to hold the lock for longer
+          than is needed. */
+          mysql_mutex_lock(&cache->deleted_lock);
+          cache->added = 0;
+          cache->deleted = 0;
+          mysql_mutex_unlock(&cache->deleted_lock);
+          DEBUG_SYNC_C("fts_sync_end");
+          return(error);
+	}
+
 	DEBUG_SYNC_C("fts_sync_begin");
 	fts_sync_begin(sync);
 
@@ -4312,21 +4331,7 @@ static dberr_t fts_sync(fts_sync_t *sync, bool unlock_cache, bool wait)
 
 	mysql_mutex_lock(&cache->lock);
 	ut_ad(sync->in_progress);
-	sync->in_progress = false;
-	pthread_cond_broadcast(&sync->cond);
-	mysql_mutex_unlock(&cache->lock);
-	/* We need to check whether an optimize is required, for that
-	we make copies of the two variables that control the trigger. These
-	variables can change behind our back and we don't want to hold the
-	lock for longer than is needed. */
-	mysql_mutex_lock(&cache->deleted_lock);
-
-	cache->added = 0;
-	cache->deleted = 0;
-
-	mysql_mutex_unlock(&cache->deleted_lock);
-
-	return(error);
+	goto func_exit;
 }
 
 /** Run SYNC on the table, i.e., write out data from the cache to the
@@ -5250,7 +5255,8 @@ fts_t::fts_t(
 	added_synced(0), dict_locked(0),
 	add_wq(NULL),
 	cache(NULL),
-	doc_col(ULINT_UNDEFINED), in_queue(false), sync_message(false),
+	doc_col(ULINT_UNDEFINED), wait_in_queue(false),
+	in_queue(false), sync_message(false), in_process(false),
 	fts_heap(heap)
 {
 	ut_a(table->fts == NULL);
@@ -5260,6 +5266,7 @@ fts_t::fts_t(
 	indexes = ib_vector_create(heap_alloc, sizeof(dict_index_t*), 4);
 
 	dict_table_get_all_fts_indexes(table, indexes);
+	pthread_cond_init(&fts_queue_cond, nullptr);
 }
 
 /** fts_t destructor. */
@@ -5272,6 +5279,7 @@ fts_t::~fts_t()
 		fts_cache_destroy(cache);
 	}
 
+	pthread_cond_destroy(&fts_queue_cond);
 	/* There is no need to call ib_vector_free() on this->indexes
 	because it is stored in this->fts_heap. */
 	mem_heap_free(fts_heap);
diff --git a/storage/innobase/fts/fts0opt.cc b/storage/innobase/fts/fts0opt.cc
index 606fd7ef1af..57386691cef 100644
--- a/storage/innobase/fts/fts0opt.cc
+++ b/storage/innobase/fts/fts0opt.cc
@@ -38,6 +38,9 @@ Completed 2011/7/10 Sunny and Jimmy Yang
 #include "fts0opt.h"
 #include "fts0vlc.h"
 #include "wsrep.h"
+#include <mutex>
+#include <thread>
+#include <list>
 
 #ifdef WITH_WSREP
 extern Atomic_relaxed<bool> wsrep_sst_disable_writes;
@@ -47,18 +50,74 @@ constexpr bool wsrep_sst_disable_writes= false;
 
 /** The FTS optimize thread's work queue. */
 ib_wqueue_t* fts_optimize_wq;
-static void fts_optimize_callback(void *);
+static void fts_optimize_func(void *);
 static void timer_callback(void*);
 static tpool::timer* timer;
 
-static tpool::task_group task_group(1);
-static tpool::task task(fts_optimize_callback,0, &task_group);
+static tpool::task_group *task_group= nullptr;
+static tpool::task *task= nullptr;
 
-/** FTS optimize thread, for MDL acquisition */
-static THD *fts_opt_thd;
+/** Mutex to protect srv_n_fts_threads_entered */
+std::mutex fts_thread_mutex;
 
-/** The FTS vector to store fts_slot_t */
-static ib_vector_t*  fts_slots;
+/** Number of innodb fts threads */
+extern uint innodb_n_fts_threads;
+
+/** Number of innodb fts threads currently executing
+fts_optimize_func() */
+static uint srv_n_fts_threads_entered;
+
+/** FTS subsystem processed shutdown message */
+static bool fts_shutdown_processed;
+
+/* Use the mutex to set the number of fts threads */
+std::mutex fts_threads_cnt_mutex;
+
+class fts_thd_t
+{
+public:
+  THD* m_thd;
+  bool m_acquired;
+  fts_thd_t(THD *thd): m_thd(thd), m_acquired(false) {}
+
+  ~fts_thd_t()
+  {
+    destroy_background_thd(m_thd);
+  }
+};
+
+static std::list<fts_thd_t*> fts_thds;
+
+static THD* acquire_fts_thd()
+{
+  std::unique_lock<std::mutex> lk(fts_threads_cnt_mutex);
+  for (std::list<fts_thd_t*>::iterator it= fts_thds.begin();
+       it != fts_thds.end(); it++)
+  {
+    fts_thd_t* fts_thd= *it;
+    if (fts_thd->m_acquired) continue;
+    fts_thd->m_acquired= true;
+    return fts_thd->m_thd;
+  }
+
+  return nullptr;
+}
+
+static void release_fts_thd(THD *thd)
+{
+  std::unique_lock<std::mutex> lk(fts_threads_cnt_mutex);
+  for (std::list<fts_thd_t*>::iterator it= fts_thds.begin();
+       it != fts_thds.end(); it++)
+  {
+    fts_thd_t* fts_thd= *it;
+    if (fts_thd->m_thd == thd)
+    {
+      ut_ad(fts_thd->m_acquired);
+      fts_thd->m_acquired= false;
+      return;
+    }
+  }
+}
 
 /** Default optimize interval in secs. */
 static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300;
@@ -201,6 +260,10 @@ struct fts_slot_t {
 
 	/** time(NULL) of latest successful fts_optimize_table() */
 	time_t		completed;
+
+	fts_slot_t(dict_table_t *new_table):
+	   table(new_table), running(false), added(0),
+	   deleted(0), last_run(0), completed(0) {}
 };
 
 /** A table remove message for the FTS optimize thread. */
@@ -2384,48 +2447,24 @@ fts_optimize_reset_start_time(
 	return(error);
 }
 
-/*********************************************************************//**
-Run OPTIMIZE on the given table by a background thread.
+/** Run OPTIMIZE on the given table by a background thread.
+@param	table		table to be optimized
+@param	threshold	processed optimization
 @return DB_SUCCESS if all OK */
 static MY_ATTRIBUTE((nonnull))
-dberr_t
-fts_optimize_table_bk(
-/*==================*/
-	fts_slot_t*	slot)	/*!< in: table to optimiza */
+dberr_t fts_optimize_table_bk(dict_table_t *table, bool &threshold)
 {
-	const time_t now = time(NULL);
-	const ulint interval = ulint(now - slot->last_run);
-
-	/* Avoid optimizing tables that were optimized recently. */
-	if (slot->last_run > 0
-	    && lint(interval) >= 0
-	    && interval < FTS_OPTIMIZE_INTERVAL_IN_SECS) {
-
-		return(DB_SUCCESS);
-	}
-
-	dict_table_t*	table = slot->table;
-	dberr_t		error;
-
-	if (table->is_accessible()
-	    && table->fts && table->fts->cache
-	    && table->fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD) {
-		error = fts_optimize_table(table);
-
-		slot->last_run = time(NULL);
-
-		if (error == DB_SUCCESS) {
-			slot->running = false;
-			slot->completed = slot->last_run;
-		}
-	} else {
-		/* Note time this run completed. */
-		slot->last_run = now;
-		error = DB_SUCCESS;
-	}
-
-	return(error);
+  dberr_t error= DB_SUCCESS;
+  if (table->is_accessible()
+      && table->fts && table->fts->cache
+      && table->fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD)
+  {
+    error = fts_optimize_table(table);
+    threshold = true;
+  }
+  return(error);
 }
+
 /*********************************************************************//**
 Run OPTIMIZE on the given table.
 @return DB_SUCCESS if all OK */
@@ -2551,7 +2590,7 @@ fts_optimize_create_msg(
 static void add_msg(fts_msg_t *msg)
 {
   ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
-  srv_thread_pool->submit_task(&task);
+  srv_thread_pool->submit_task(task);
 }
 
 /**
@@ -2560,7 +2599,7 @@ will only recalculate is_sync_needed, in case the queue is empty.
 */
 static void timer_callback(void*)
 {
-  srv_thread_pool->submit_task(&task);
+  srv_thread_pool->submit_task(task);
 }
 
 /** Add the table to add to the OPTIMIZER's list.
@@ -2580,9 +2619,15 @@ void fts_optimize_add_table(dict_table_t* table)
 
 	mysql_mutex_lock(&fts_optimize_wq->mutex);
 
+	if (table->fts->in_queue || table->fts->wait_in_queue) {
+		mysql_mutex_unlock(&fts_optimize_wq->mutex);
+		mem_heap_free(msg->heap);
+		return;
+	}
+
 	add_msg(msg);
 
-	table->fts->in_queue = true;
+	table->fts->wait_in_queue = true;
 
 	mysql_mutex_unlock(&fts_optimize_wq->mutex);
 }
@@ -2609,19 +2654,28 @@ fts_optimize_remove_table(
 
   mysql_mutex_lock(&fts_optimize_wq->mutex);
 
-  if (table->fts->in_queue)
+  if (!table->fts->wait_in_queue && !table->fts->in_queue)
   {
-    fts_msg_t *msg= fts_optimize_create_msg(FTS_MSG_DEL_TABLE, nullptr);
-    pthread_cond_t cond;
-    pthread_cond_init(&cond, nullptr);
-    msg->ptr= new(mem_heap_alloc(msg->heap, sizeof(fts_msg_del_t)))
-      fts_msg_del_t{table, &cond};
-    add_msg(msg);
-    my_cond_wait(&cond, &fts_optimize_wq->mutex.m_mutex);
-    pthread_cond_destroy(&cond);
-    ut_ad(!table->fts->in_queue);
+    mysql_mutex_unlock(&fts_optimize_wq->mutex);
+    return;
   }
 
+  /* Make sure that InnoDB table was added in fts_slots */
+  while (!table->fts->in_queue || table->fts->sync_message
+         || table->fts->in_process)
+    my_cond_wait(&table->fts->fts_queue_cond,
+                 &fts_optimize_wq->mutex.m_mutex);
+
+  fts_msg_t *msg= fts_optimize_create_msg(FTS_MSG_DEL_TABLE, nullptr);
+  pthread_cond_t cond;
+  pthread_cond_init(&cond, nullptr);
+  msg->ptr= new(mem_heap_alloc(msg->heap, sizeof(fts_msg_del_t)))
+    fts_msg_del_t{table, &cond};
+  add_msg(msg);
+  my_cond_wait(&cond, &fts_optimize_wq->mutex.m_mutex);
+  pthread_cond_destroy(&cond);
+  ut_ad(!table->fts->in_queue);
+
   mysql_mutex_unlock(&fts_optimize_wq->mutex);
 }
 
@@ -2648,132 +2702,6 @@ void fts_optimize_request_sync_table(dict_table_t *table)
   mysql_mutex_unlock(&fts_optimize_wq->mutex);
 }
 
-/** Add a table to fts_slots if it doesn't already exist. */
-static bool fts_optimize_new_table(dict_table_t* table)
-{
-	ut_ad(table);
-
-	ulint		i;
-	fts_slot_t*	slot;
-	fts_slot_t*	empty = NULL;
-
-	/* Search for duplicates, also find a free slot if one exists. */
-	for (i = 0; i < ib_vector_size(fts_slots); ++i) {
-
-		slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
-
-		if (!slot->table) {
-			empty = slot;
-		} else if (slot->table == table) {
-			/* Already exists in our optimize queue. */
-			return false;
-		}
-	}
-
-	slot = empty ? empty : static_cast<fts_slot_t*>(
-		ib_vector_push(fts_slots, NULL));
-
-	memset(slot, 0x0, sizeof(*slot));
-
-	slot->table = table;
-	return true;
-}
-
-/** Remove a table from fts_slots if it exists.
-@param remove	table to be removed from fts_slots */
-static bool fts_optimize_del_table(fts_msg_del_t *remove)
-{
-	const dict_table_t* table = remove->table;
-	ut_ad(table);
-	for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
-		fts_slot_t*	slot;
-
-		slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
-
-		if (slot->table == table) {
-			if (UNIV_UNLIKELY(fts_enable_diag_print)) {
-				ib::info() << "FTS Optimize Removing table "
-					<< table->name;
-			}
-
-			mysql_mutex_lock(&fts_optimize_wq->mutex);
-			table->fts->in_queue = false;
-			pthread_cond_signal(remove->cond);
-			mysql_mutex_unlock(&fts_optimize_wq->mutex);
-			slot->table = NULL;
-			return true;
-		}
-	}
-
-	mysql_mutex_lock(&fts_optimize_wq->mutex);
-	pthread_cond_signal(remove->cond);
-	mysql_mutex_unlock(&fts_optimize_wq->mutex);
-	return false;
-}
-
-/**********************************************************************//**
-Calculate how many tables in fts_slots need to be optimized.
-@return no. of tables to optimize */
-static ulint fts_optimize_how_many()
-{
-	ulint n_tables = 0;
-	const time_t current_time = time(NULL);
-
-	for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
-		const fts_slot_t* slot = static_cast<const fts_slot_t*>(
-			ib_vector_get_const(fts_slots, i));
-		if (!slot->table) {
-			continue;
-		}
-
-		const time_t end = slot->running
-			? slot->last_run : slot->completed;
-		ulint interval = ulint(current_time - end);
-
-		if (lint(interval) < 0
-		    || interval >= FTS_OPTIMIZE_INTERVAL_IN_SECS) {
-			++n_tables;
-		}
-	}
-
-	return(n_tables);
-}
-
-/**********************************************************************//**
-Check if the total memory used by all FTS table exceeds the maximum limit.
-@return true if a sync is needed, false otherwise */
-static bool fts_is_sync_needed()
-{
-	ulint		total_memory = 0;
-	const time_t	now = time(NULL);
-	double		time_diff = difftime(now, last_check_sync_time);
-
-	if (fts_need_sync || (time_diff >= 0 && time_diff < 5)) {
-		return(false);
-	}
-
-	last_check_sync_time = now;
-
-	for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
-		const fts_slot_t* slot = static_cast<const fts_slot_t*>(
-			ib_vector_get_const(fts_slots, i));
-
-		if (!slot->table) {
-			continue;
-		}
-
-		if (slot->table->fts && slot->table->fts->cache) {
-			total_memory += slot->table->fts->cache->total_size;
-		}
-
-		if (total_memory > fts_max_total_cache_size) {
-			return(true);
-		}
-	}
-
-	return(false);
-}
-
 /** Sync fts cache of a table
 @param[in,out]  table           table to be synced
 @param[in]      process_message processing messages from fts_optimize_wq */
@@ -2781,20 +2709,26 @@ static void fts_optimize_sync_table(dict_table_t *table,
                                     bool process_message= false)
 {
   MDL_ticket* mdl_ticket= nullptr;
+  THD* fts_opt_thd= acquire_fts_thd();
+  ut_ad(fts_opt_thd);
   dict_table_t *sync_table= dict_acquire_mdl_shared<true>(table, fts_opt_thd,
                                                           &mdl_ticket);
 
   if (!sync_table)
-    return;
+  {
+    if (process_message)
+      goto func_exit;
+  }
 
   if (sync_table->fts && sync_table->fts->cache && sync_table->is_accessible())
   {
     fts_sync_table(sync_table, false);
-
+func_exit:
     if (process_message)
     {
       mysql_mutex_lock(&fts_optimize_wq->mutex);
       sync_table->fts->sync_message = false;
+      pthread_cond_broadcast(&table->fts->fts_queue_cond);
       mysql_mutex_unlock(&fts_optimize_wq->mutex);
     }
   }
@@ -2804,137 +2738,291 @@ static void fts_optimize_sync_table(dict_table_t *table,
 
   if (mdl_ticket)
     dict_table_close(sync_table, false, fts_opt_thd, mdl_ticket);
+  release_fts_thd(fts_opt_thd);
 }
 
-/**********************************************************************//**
-Optimize all FTS tables.
-@return Dummy return */
-static void fts_optimize_callback(void *)
+class fts_slots_t
 {
-	ut_ad(!srv_read_only_mode);
-
-	static ulint	current;
-	static bool	done;
-	static ulint	n_optimize;
-
-	if (!fts_optimize_wq || done) {
-		/* Possibly timer initiated callback, can come after FTS_MSG_STOP.*/
-		return;
-	}
-
-	static ulint		n_tables = ib_vector_size(fts_slots);
-
-	while (!done && srv_shutdown_state <= SRV_SHUTDOWN_INITIATED) {
-		/* If there is no message in the queue and we have tables
-		to optimize then optimize the tables. */
+  typedef std::vector<fts_slot_t*, ut_allocator<fts_slot_t*> > fts_slots_vec;
+  fts_slots_vec *m_fts_slots;
+  std::mutex m_mutex;
+  ulint m_n_optimize_tables;
+public:
+  fts_slots_t()
+  {
+    m_fts_slots= new fts_slots_vec();
+  }
 
-		if (!done
-		    && ib_wqueue_is_empty(fts_optimize_wq)
-		    && n_tables > 0
-		    && n_optimize > 0) {
+  ~fts_slots_t()
+  {
+    for (fts_slots_vec::iterator it= m_fts_slots->begin();
+         it != m_fts_slots->end(); it++)
+       delete *it;
+    m_fts_slots->clear();
+    delete m_fts_slots;
+  }
 
-			/* The queue is empty but we have tables
-			to optimize. */
-			if (UNIV_UNLIKELY(wsrep_sst_disable_writes)) {
-retry_later:
-				if (fts_is_sync_needed()) {
-					fts_need_sync = true;
-				}
-				if (n_tables) {
-					timer->set_time(5000, 0);
-				}
-				return;
-			}
+  ulint get_n_optimize_tables()
+  {
+    std::unique_lock<std::mutex> lk(m_mutex);
+    return m_n_optimize_tables;
+  }
 
-			fts_slot_t* slot = static_cast<fts_slot_t*>(
-				ib_vector_get(fts_slots, current));
+  void add_new_table(dict_table_t *table)
+  {
+    std::unique_lock<std::mutex> lk(m_mutex);
+    for (fts_slots_vec::iterator it= m_fts_slots->begin();
+         it != m_fts_slots->end(); it++)
+      if ((*it)->table == table) return;
+    m_fts_slots->push_back(new fts_slot_t(table));
+    mysql_mutex_lock(&fts_optimize_wq->mutex);
+    table->fts->wait_in_queue = false;
+    table->fts->in_queue = true;
+    pthread_cond_broadcast(&table->fts->fts_queue_cond);
+    mysql_mutex_unlock(&fts_optimize_wq->mutex);
+  }
 
-			/* Handle the case of empty slots. */
-			if (slot->table) {
-				slot->running = true;
-				fts_optimize_table_bk(slot);
-			}
+  void delete_table(const dict_table_t *table)
+  {
+    std::unique_lock<std::mutex> lk(m_mutex);
+    for (fts_slots_vec::iterator it= m_fts_slots->begin();
+         it != m_fts_slots->end(); it++)
+    {
+      if ((*it)->table == table)
+      {
+        fts_slot_t *slot= *it;
+	m_fts_slots->erase(it);
+	delete slot;
+        mysql_mutex_lock(&fts_optimize_wq->mutex);
+	table->fts->in_queue= false;
+	mysql_mutex_unlock(&fts_optimize_wq->mutex);
+	return;
+      }
+    }
+  }
 
-			/* Wrap around the counter. */
-			if (++current >= ib_vector_size(fts_slots)) {
-				n_optimize = fts_optimize_how_many();
-				current = 0;
-			}
-		} else if (n_optimize == 0
-			   || !ib_wqueue_is_empty(fts_optimize_wq)) {
-			fts_msg_t* msg = static_cast<fts_msg_t*>
-				(ib_wqueue_nowait(fts_optimize_wq));
-			/* Timeout ? */
-			if (!msg) {
-				goto retry_later;
-			}
+  void update_need_sync()
+  {
+    std::unique_lock<std::mutex> lk(m_mutex);
+    const time_t current_time = time(NULL);
+    m_n_optimize_tables= 0;
+    for (fts_slots_vec::iterator it= m_fts_slots->begin();
+         it != m_fts_slots->end(); it++)
+    {
+      const fts_slot_t* slot= *it;
+      if (!slot->table) continue;
+      const time_t end= slot->running
+                        ? slot->last_run : slot->completed;
+      ulint interval= ulint(current_time - end);
+      if (lint(interval) < 0 ||
+          interval >= FTS_OPTIMIZE_INTERVAL_IN_SECS)
+        ++m_n_optimize_tables;
+     }
+   }
+
+   bool is_sync_need()
+   {
+     std::unique_lock<std::mutex> lk(m_mutex);
+     ulint total_memory= 0;
+     for (fts_slots_vec::iterator it= m_fts_slots->begin();
+          it != m_fts_slots->end(); it++)
+     {
+       const fts_slot_t* slot= *it;
+       if (!slot->table) continue;
+       if (slot->table->fts && slot->table->fts->cache)
+	 total_memory+= slot->table->fts->cache->total_size;
+       if (total_memory > fts_max_total_cache_size) return true;
+     }
+     return false;
+   }
+
+   dict_table_t *get_optimize_table()
+   {
+     std::unique_lock<std::mutex> lk(m_mutex);
+     if (m_fts_slots->empty()) return nullptr;
+     static ulint current= 0;
+     ulint size= m_fts_slots->size();
+     if (current >= size) current= 0;
+     ulint start_val= current;
+     bool iteration= false;
+read_slot:
+     /* Traversed the entire list */
+     if (start_val == current && iteration) { return nullptr; }
+     current++;
+     /* Reset and traverse from start */
+     if (current == size) current = 0;
+     iteration= true;
+     fts_slot_t *slot= m_fts_slots->at(current);
+     if (slot->table && !slot->running)
+     {
+       slot->running = true;
+       const time_t now = time(NULL);
+       const ulint interval = ulint(now - slot->last_run);
+       /* Avoid optimizing tables that were optimized recently. */
+       if (slot->last_run > 0 && lint(interval) >= 0
+           && interval < FTS_OPTIMIZE_INTERVAL_IN_SECS)
+         goto read_slot;
+     } else goto read_slot;
+
+     mysql_mutex_lock(&fts_optimize_wq->mutex);
+     slot->table->fts->in_process= true;
+     mysql_mutex_unlock(&fts_optimize_wq->mutex);
+     return slot->table;
+   }
+
+   void update_slot(dict_table_t *table, bool threshold, dberr_t *err)
+   {
+     std::unique_lock<std::mutex> lk(m_mutex);
+     for (fts_slots_vec::iterator it= m_fts_slots->begin();
+          it != m_fts_slots->end(); it++)
+     {
+       fts_slot_t* slot= *it;
+       if (slot->table != table) continue;
+       else
+       {
+	 slot->last_run= time(nullptr);
+	 slot->running= false;
+
+	 mysql_mutex_lock(&fts_optimize_wq->mutex);
+         table->fts->in_process= false;
+         pthread_cond_broadcast(&table->fts->fts_queue_cond);
+         mysql_mutex_unlock(&fts_optimize_wq->mutex);
+
+	 if (!threshold) return;
+	 if (*err == DB_SUCCESS)
+	   slot->completed= slot->last_run;
+       }
+     }
+   }
+
+   dict_table_t* pop_table()
+   {
+     std::unique_lock<std::mutex> lk(m_mutex);
+     if (m_fts_slots->empty()) return nullptr;
+     dict_table_t *table= (*m_fts_slots->begin())->table;
+     m_fts_slots->erase(m_fts_slots->begin());
+     return table;
+   }
 
-			switch (msg->type) {
-			case FTS_MSG_STOP:
-				done = true;
-				break;
+};
 
-			case FTS_MSG_ADD_TABLE:
-				ut_a(!done);
-				if (fts_optimize_new_table(
-					    static_cast<dict_table_t*>(
-						    msg->ptr))) {
-					++n_tables;
-				}
-				break;
+fts_slots_t *fts_table_slots;
 
-			case FTS_MSG_DEL_TABLE:
-				if (fts_optimize_del_table(
-					    static_cast<fts_msg_del_t*>(
-						    msg->ptr))) {
-					--n_tables;
-				}
-				break;
-			case FTS_MSG_SYNC_TABLE:
-				if (UNIV_UNLIKELY(wsrep_sst_disable_writes)) {
-					add_msg(msg);
-					goto retry_later;
-				}
-
-				DBUG_EXECUTE_IF(
-					"fts_instrument_msg_sync_sleep",
-					std::this_thread::sleep_for(
-						std::chrono::milliseconds(
-							300)););
+static void fts_process_msg(fts_msg_t *msg)
+{
+  switch (msg->type)
+  {
+    case FTS_MSG_STOP:
+      {
+        std::unique_lock<std::mutex> lk(fts_thread_mutex);
+        fts_shutdown_processed= true;
+      }
+      break;
+    case FTS_MSG_ADD_TABLE:
+      fts_table_slots->add_new_table(
+        static_cast<dict_table_t*>(msg->ptr));
+      break;
+    case FTS_MSG_DEL_TABLE:
+    {
+      fts_msg_del_t* remove= static_cast<fts_msg_del_t*>(msg->ptr);
+      fts_table_slots->delete_table(remove->table);
+      mysql_mutex_lock(&fts_optimize_wq->mutex);
+      pthread_cond_signal(remove->cond);
+      mysql_mutex_unlock(&fts_optimize_wq->mutex);
+      break;
+    }
+    case FTS_MSG_SYNC_TABLE:
+      DBUG_EXECUTE_IF("fts_instrument_msg_sync_sleep",
+                      std::this_thread::sleep_for(
+                       std::chrono::milliseconds(300)););
+      fts_optimize_sync_table(
+        static_cast<dict_table_t*>(msg->ptr), true);
+      break;
+    default: ut_error;
+  }
+  mem_heap_free(msg->heap);
+  return;
+}
 
-				fts_optimize_sync_table(
-					static_cast<dict_table_t*>(msg->ptr),
-					true);
-				break;
-			default:
-				ut_error;
-			}
+static void fts_optimize_func(void *)
+{
+  ut_ad(!srv_read_only_mode);
 
-			mem_heap_free(msg->heap);
-			n_optimize = done ? 0 : fts_optimize_how_many();
-		}
-	}
+  std::unique_lock<std::mutex> lk(fts_thread_mutex);
+  /** Acquire mutex to check fts_optimize_wq and how many
+  threads entered this function */
+  if (!fts_optimize_wq || !fts_table_slots)
+    /* Possibly timer initiated callback */
+    return;
 
-	/* Server is being shutdown, sync the data from FTS cache to disk
-	if needed */
-	if (n_tables > 0) {
-		for (ulint i = 0; i < ib_vector_size(fts_slots); i++) {
-			fts_slot_t* slot = static_cast<fts_slot_t*>(
-				ib_vector_get(fts_slots, i));
+  srv_n_fts_threads_entered++;
+  lk.unlock();
+  while (srv_shutdown_state <= SRV_SHUTDOWN_INITIATED)
+  {
+    lk.lock();
+    if (fts_shutdown_processed == true) { lk.unlock(); break; }
+    lk.unlock();
+    ulint n_opt_tables= fts_table_slots->get_n_optimize_tables();
+    if (ib_wqueue_is_empty(fts_optimize_wq))
+    {
+      /* Queue is empty, but we have table to optimize */
+      if (UNIV_UNLIKELY(wsrep_sst_disable_writes))
+      {
+retry_later:
+        if (fts_table_slots->is_sync_need()) fts_need_sync= true;
+	if (n_opt_tables) timer->set_time(5000, 0);
+	lk.lock();
+	srv_n_fts_threads_entered--;
+	return;
+      }
+      if (n_opt_tables == 0) goto retry_later;
+      else
+      {
+        dict_table_t* table= fts_table_slots->get_optimize_table();
+	if (!table) fts_table_slots->update_need_sync();
+	else
+	{
+	  bool threshold= false;
+	  dberr_t err= fts_optimize_table_bk(table, threshold);
+	  fts_table_slots->update_slot(table, threshold, &err);
+	}
+      }
+    }
+    else
+    {
+      fts_msg_t* msg = static_cast<fts_msg_t*>(
+        ib_wqueue_nowait(fts_optimize_wq));
+      if (!msg) { goto retry_later; }
+      else fts_process_msg(msg);
+    }
+  }
 
-			if (slot->table) {
-				fts_optimize_sync_table(slot->table);
-			}
-		}
-	}
+  /* Process all messages from fts_optimize_wq */
+  while (!ib_wqueue_is_empty(fts_optimize_wq))
+  {
+    fts_msg_t *msg= static_cast<fts_msg_t*>(
+      ib_wqueue_nowait(fts_optimize_wq));
+    if (!msg) continue;
+    fts_process_msg(msg);
+  }
 
-	ib_vector_free(fts_slots);
-	mysql_mutex_lock(&fts_optimize_wq->mutex);
-	fts_slots = NULL;
-	pthread_cond_broadcast(&fts_opt_shutdown_cond);
-	mysql_mutex_unlock(&fts_optimize_wq->mutex);
+  /* Sync all tables during shutdown */
+  while (dict_table_t *table= fts_table_slots->pop_table())
+    fts_optimize_sync_table(table);
 
-	ib::info() << "FTS optimize thread exiting.";
+  /* Decrement the working threads */
+  lk.lock();
+  srv_n_fts_threads_entered--;
+  /* Free the slots and broadcast the shutdown signal */
+  if (srv_n_fts_threads_entered == 0)
+  {
+    mysql_mutex_lock(&fts_optimize_wq->mutex);
+    delete fts_table_slots;
+    fts_table_slots= nullptr;
+    pthread_cond_broadcast(&fts_opt_shutdown_cond);
+    mysql_mutex_unlock(&fts_optimize_wq->mutex);
+    ib::info() << "FTS optimize threads exiting.";
+  }
 }
 
 /**********************************************************************//**
@@ -2943,9 +3031,6 @@ void
 fts_optimize_init(void)
 /*===================*/
 {
-	mem_heap_t*	heap;
-	ib_alloc_t*     heap_alloc;
-
 	ut_ad(!srv_read_only_mode);
 
 	/* For now we only support one optimize thread. */
@@ -2953,14 +3038,10 @@ fts_optimize_init(void)
 
 	/* Create FTS optimize work queue */
 	fts_optimize_wq = ib_wqueue_create();
+	fts_table_slots= new fts_slots_t();
+	fts_set_n_threads(innodb_n_fts_threads);
 	timer = srv_thread_pool->create_timer(timer_callback);
 
-	/* Create FTS vector to store fts_slot_t */
-	heap = mem_heap_create(sizeof(dict_table_t*) * 64);
-	heap_alloc = ib_heap_allocator_create(heap);
-	fts_slots = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
-
-	fts_opt_thd = innobase_create_background_thd("InnoDB FTS optimizer");
 	/* Add fts tables to fts_slots which could be skipped
 	during dict_load_table_one() because fts_optimize_thread
 	wasn't even started. */
@@ -2976,7 +3057,7 @@ fts_optimize_init(void)
 		need to acquire fts_optimize_wq->mutex for adding the fts
 		table to the fts slots. */
 		ut_ad(!table->can_be_evicted);
-		fts_optimize_new_table(table);
+		fts_table_slots->add_new_table(table);
 		table->fts->in_queue = true;
 	}
 	dict_sys.unfreeze();
@@ -3005,23 +3086,27 @@ fts_optimize_shutdown()
 	can't delete the work queue here because the add thread needs
 	deregister the FTS tables. */
 	timer->disarm();
-	task_group.cancel_pending(&task);
+	task_group->cancel_pending(task);
 
 	add_msg(fts_optimize_create_msg(FTS_MSG_STOP, nullptr));
-
-	while (fts_slots) {
+	while (fts_table_slots) {
 		my_cond_wait(&fts_opt_shutdown_cond,
 			     &fts_optimize_wq->mutex.m_mutex);
 	}
 
-	destroy_background_thd(fts_opt_thd);
-	fts_opt_thd = NULL;
+	while (!fts_thds.empty()) {
+	   delete fts_thds.front();
+	   fts_thds.pop_front();
+	}
+
 	pthread_cond_destroy(&fts_opt_shutdown_cond);
 	mysql_mutex_unlock(&fts_optimize_wq->mutex);
 
 	ib_wqueue_free(fts_optimize_wq);
 	fts_optimize_wq = NULL;
 
+	delete task_group;
+	delete task;
 	delete timer;
 	timer = NULL;
 }
@@ -3032,15 +3117,23 @@ void fts_sync_during_ddl(dict_table_t* table)
 {
   if (!fts_optimize_wq)
     return;
-  mysql_mutex_lock(&fts_optimize_wq->mutex);
-  const auto sync_message= table->fts->sync_message;
-  mysql_mutex_unlock(&fts_optimize_wq->mutex);
-  if (!sync_message)
-    return;
 
   fts_sync_table(table, false);
+}
 
-  mysql_mutex_lock(&fts_optimize_wq->mutex);
-  table->fts->sync_message = false;
-  mysql_mutex_unlock(&fts_optimize_wq->mutex);
+void fts_set_n_threads(const uint new_cnt)
+{
+  std::unique_lock<std::mutex> lk(fts_threads_cnt_mutex);
+  if (task_group)
+    task_group->set_max_tasks(new_cnt);
+  else
+  {
+    task_group= new tpool::task_group(new_cnt);
+    task= new tpool::task(fts_optimize_func,0,task_group);
+    for (uint i= 0; i < new_cnt; i++)
+       fts_thds.push_back(
+         new fts_thd_t(
+	   innobase_create_background_thd("InnoDB fts threads")));
+  }
+  innodb_n_fts_threads= new_cnt;
 }
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index f102789d7ab..3d7fd9a9969 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -206,6 +206,8 @@ my_bool innodb_evict_tables_on_commit_debug;
 /** File format constraint for ALTER TABLE */
 ulong innodb_instant_alter_column_allowed;
 
+uint innodb_n_fts_threads;
+
 /** Note we cannot use rec_format_enum because we do not allow
 COMPRESSED row format for innodb_default_row_format option. */
 enum default_row_format_enum {
@@ -18603,6 +18605,12 @@ innodb_encrypt_tables_update(THD*, st_mysql_sys_var*, void*, const void* save)
 	mysql_mutex_lock(&LOCK_global_system_variables);
 }
 
+static void innodb_fts_threads_update(
+	THD*,st_mysql_sys_var*,void*, const void* save)
+{
+  fts_set_n_threads(*static_cast<const uint*>(save));
+}
+
 static SHOW_VAR innodb_status_variables_export[]= {
 	SHOW_FUNC_ENTRY("Innodb", &show_innodb_vars),
 	{NullS, NullS, SHOW_LONG}
@@ -19747,6 +19755,13 @@ static MYSQL_SYSVAR_BOOL(encrypt_temporary_tables, innodb_encrypt_temporary_tabl
   "Enrypt the temporary table data.",
   NULL, NULL, false);
 
+static MYSQL_SYSVAR_UINT(fts_threads, innodb_n_fts_threads,
+			 PLUGIN_VAR_RQCMDARG,
+			 "Number of threads performing background fts operation ",
+			 NULL,
+			 innodb_fts_threads_update,
+			 2, 1, 255, 0);
+
 static struct st_mysql_sys_var* innobase_system_variables[]= {
   MYSQL_SYSVAR(autoextend_increment),
   MYSQL_SYSVAR(buffer_pool_size),
@@ -19913,6 +19928,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
   MYSQL_SYSVAR(buf_dump_status_frequency),
   MYSQL_SYSVAR(background_thread),
   MYSQL_SYSVAR(encrypt_temporary_tables),
+  MYSQL_SYSVAR(fts_threads),
 
   NULL
 };
diff --git a/storage/innobase/include/fts0fts.h b/storage/innobase/include/fts0fts.h
index 6aa6f8ceb18..c47366bd8c9 100644
--- a/storage/innobase/include/fts0fts.h
+++ b/storage/innobase/include/fts0fts.h
@@ -332,7 +332,11 @@ class fts_t {
 	/** Vector of FTS indexes, this is mainly for caching purposes. */
 	ib_vector_t*	indexes;
 
-	/** Whether the table exists in fts_optimize_wq;
+	/** Whether the addition of new table message in
+	fts_optimize_wq; protected by fts_optimize_wq mutex */
+	bool		wait_in_queue;
+
+	/** Whether the table is in fts_optimize_wq;
 	protected by fts_optimize_wq mutex */
 	bool		in_queue;
 
@@ -340,6 +344,15 @@ class fts_t {
 	protected by fts_optimize_wq mutex */
 	bool		sync_message;
 
+	/** Whether the table is picked by fts threads
+	protected by fts_optimize_wq mutex */
+	bool		in_process;
+
+	/** Condition variable to wake up the background thread
+	when table is in fts queue or sync or in_process
+	condition */
+	pthread_cond_t	fts_queue_cond;
+
 	/** Heap for fts_t allocation. */
 	mem_heap_t*	fts_heap;
 };
@@ -943,3 +956,6 @@ fts_update_sync_doc_id(const dict_table_t *table,
 /** Sync the table during commit phase
 @param[in]	table	table to be synced */
 void fts_sync_during_ddl(dict_table_t* table);
+
+/** Set the number of fts threads */
+void fts_set_n_threads(const uint new_cnt);
diff --git a/storage/innobase/include/fts0types.h b/storage/innobase/include/fts0types.h
index 1482080c09e..c34ad52ac3d 100644
--- a/storage/innobase/include/fts0types.h
+++ b/storage/innobase/include/fts0types.h
@@ -176,8 +176,9 @@ struct fts_node_t {
 	ulint		ilist_size_alloc;
 					/*!< Allocated size of ilist in
 					bytes */
-	bool		synced;		/*!< flag whether the node is
-synced */
+	/** Flag to indicate whether node is synced.
+	Protected by cache->lock */
+	bool		synced;
 };
 
 /** A tokenizer word. Contains information about one word. */
diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h
index 96cfe886c02..69af8de3732 100644
--- a/storage/innobase/include/srv0srv.h
+++ b/storage/innobase/include/srv0srv.h
@@ -383,6 +383,7 @@ extern ulong	srv_max_purge_lag_delay;
 extern my_bool	innodb_encrypt_temporary_tables;
 
 extern my_bool  srv_immediate_scrub_data_uncompressed;
+
 /*-------------------------------------------*/
 
 /** Modes of operation */
