commit 2a90c71e564506236da402aa2110d6e905dd6b78
Author: Marko Mäkelä <marko.makela@mariadb.com>
Date:   Mon May 24 18:07:08 2021 +0300

    MDEV-25113 WIP
    
    FIXME: mariabackup.huge_lsn mariabackup.mdev-14447

diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc
index fb3a9687cfc..aa74fb0b0fb 100644
--- a/storage/innobase/buf/buf0flu.cc
+++ b/storage/innobase/buf/buf0flu.cc
@@ -63,8 +63,11 @@ static constexpr ulint buf_flush_lsn_scan_factor = 3;
 /** Average redo generation rate */
 static lsn_t lsn_avg_rate = 0;
 
-/** Target oldest_modification for the page cleaner; writes are protected by
-buf_pool.flush_list_mutex */
+/** Target oldest_modification for the page cleaner background flushing;
+writes are protected by buf_pool.flush_list_mutex */
+static Atomic_relaxed<lsn_t> buf_flush_async_lsn;
+/** Target oldest_modification for the page cleaner furious flushing;
+writes are protected by buf_pool.flush_list_mutex */
 static Atomic_relaxed<lsn_t> buf_flush_sync_lsn;
 
 #ifdef UNIV_PFS_THREAD
@@ -1795,9 +1798,10 @@ ATTRIBUTE_COLD void buf_flush_wait_flushed(lsn_t sync_lsn)
     log_checkpoint();
 }
 
-/** If innodb_flush_sync=ON, initiate a furious flush.
-@param lsn buf_pool.get_oldest_modification(LSN_MAX) target */
-void buf_flush_ahead(lsn_t lsn)
+/** Initiate more eager page flushing if the log checkpoint age is too old.
+@param lsn      buf_pool.get_oldest_modification(LSN_MAX) target
+@param furious  true=furious flushing, false=limit to innodb_io_capacity */
+ATTRIBUTE_COLD void buf_flush_ahead(lsn_t lsn, bool furious)
 {
   mysql_mutex_assert_not_owner(&log_sys.mutex);
   ut_ad(!srv_read_only_mode);
@@ -1805,14 +1809,15 @@ void buf_flush_ahead(lsn_t lsn)
   if (recv_recovery_is_on())
     recv_sys.apply(true);
 
-  if (buf_flush_sync_lsn < lsn)
+  Atomic_relaxed<lsn_t> &limit= furious
+    ? buf_flush_sync_lsn : buf_flush_async_lsn;
+
+  if (limit < lsn)
   {
     mysql_mutex_lock(&buf_pool.flush_list_mutex);
-    if (buf_flush_sync_lsn < lsn)
-    {
-      buf_flush_sync_lsn= lsn;
-      pthread_cond_signal(&buf_pool.do_flush_list);
-    }
+    if (limit < lsn)
+      limit= lsn;
+    pthread_cond_signal(&buf_pool.do_flush_list);
     mysql_mutex_unlock(&buf_pool.flush_list_mutex);
   }
 }
@@ -1887,6 +1892,8 @@ ATTRIBUTE_COLD static void buf_flush_sync_for_checkpoint(lsn_t lsn)
 
     if (measure >= target)
       buf_flush_sync_lsn= 0;
+    else if (measure >= buf_flush_async_lsn)
+      buf_flush_async_lsn= 0;
 
     /* wake up buf_flush_wait_flushed() */
     pthread_cond_broadcast(&buf_pool.done_flush_list);
@@ -1906,7 +1913,7 @@ static bool af_needed_for_redo(lsn_t oldest_lsn)
 {
   lsn_t age= (log_sys.get_lsn() - oldest_lsn);
   lsn_t af_lwm= static_cast<lsn_t>(srv_adaptive_flushing_lwm *
-   static_cast<double>(log_sys.log_capacity) / 100);
+    static_cast<double>(log_sys.log_capacity) / 100);
 
   /* if age > af_lwm adaptive flushing is recommended */
   return (age > af_lwm);
@@ -2117,7 +2124,6 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
     else if (srv_shutdown_state > SRV_SHUTDOWN_INITIATED)
       break;
 
-
     /* If buf pager cleaner is idle and there is no work
     (either dirty pages are all flushed or adaptive flushing
     is not enabled) then opt for non-timed wait */
@@ -2131,6 +2137,7 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
 
     set_timespec(abstime, 1);
 
+    lsn_t soft_lsn_limit= buf_flush_async_lsn;
     lsn_limit= buf_flush_sync_lsn;
 
     if (UNIV_UNLIKELY(lsn_limit != 0))
@@ -2152,6 +2159,7 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
         pthread_cond_broadcast(&buf_pool.done_flush_list);
       }
 unemployed:
+      buf_flush_async_lsn= 0;
       buf_pool.page_cleaner_set_idle(true);
       continue;
     }
@@ -2168,7 +2176,7 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
 
     bool idle_flush= false;
 
-    if (lsn_limit);
+    if (lsn_limit || soft_lsn_limit);
     else if (af_needed_for_redo(oldest_lsn));
     else if (srv_max_dirty_pages_pct_lwm != 0.0)
     {
@@ -2193,11 +2201,16 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
       goto unemployed;
 
     if (UNIV_UNLIKELY(lsn_limit != 0) && oldest_lsn >= lsn_limit)
-      buf_flush_sync_lsn= 0;
+      lsn_limit= buf_flush_sync_lsn= 0;
+    if (UNIV_UNLIKELY(soft_lsn_limit != 0) && oldest_lsn >= soft_lsn_limit)
+      soft_lsn_limit= buf_flush_async_lsn= 0;
 
     buf_pool.page_cleaner_set_idle(false);
     mysql_mutex_unlock(&buf_pool.flush_list_mutex);
 
+    if (!lsn_limit)
+      lsn_limit= soft_lsn_limit;
+
     ulint n_flushed;
 
     if (UNIV_UNLIKELY(lsn_limit != 0))
@@ -2248,7 +2261,7 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
         goto do_checkpoint;
       }
     }
-    else
+    else if (buf_flush_async_lsn <= oldest_lsn)
     {
       mysql_mutex_lock(&buf_pool.flush_list_mutex);
       goto unemployed;
@@ -2305,6 +2318,7 @@ ATTRIBUTE_COLD void buf_flush_page_cleaner_init()
   ut_ad(srv_operation == SRV_OPERATION_NORMAL ||
         srv_operation == SRV_OPERATION_RESTORE ||
         srv_operation == SRV_OPERATION_RESTORE_EXPORT);
+  buf_flush_async_lsn= 0;
   buf_flush_sync_lsn= 0;
   buf_page_cleaner_is_active= true;
   os_thread_create(buf_flush_page_cleaner);
diff --git a/storage/innobase/include/buf0flu.h b/storage/innobase/include/buf0flu.h
index 1dd0d35793b..18e569888dc 100644
--- a/storage/innobase/include/buf0flu.h
+++ b/storage/innobase/include/buf0flu.h
@@ -104,9 +104,10 @@ void buf_flush_wait_batch_end(bool lru);
 /** Wait until all persistent pages are flushed up to a limit.
 @param sync_lsn   buf_pool.get_oldest_modification(LSN_MAX) to wait for */
 ATTRIBUTE_COLD void buf_flush_wait_flushed(lsn_t sync_lsn);
-/** If innodb_flush_sync=ON, initiate a furious flush.
-@param lsn buf_pool.get_oldest_modification(LSN_MAX) target */
-void buf_flush_ahead(lsn_t lsn);
+/** Initiate more eager page flushing if the log checkpoint age is too old.
+@param lsn      buf_pool.get_oldest_modification(LSN_MAX) target
+@param furious  true=furious flushing, false=limit to innodb_io_capacity */
+ATTRIBUTE_COLD void buf_flush_ahead(lsn_t lsn, bool furious);
 
 /********************************************************************//**
 This function should be called at a mini-transaction commit, if a page was
diff --git a/storage/innobase/include/mtr0mtr.h b/storage/innobase/include/mtr0mtr.h
index 97bfa2e53cb..a9539000602 100644
--- a/storage/innobase/include/mtr0mtr.h
+++ b/storage/innobase/include/mtr0mtr.h
@@ -588,6 +588,17 @@ struct mtr_t {
   @return number of buffer count added by this mtr */
   uint32_t get_fix_count(const buf_block_t *block) const;
 
+  /** type of page flushing is needed during commit() */
+  enum page_flush_ahead
+  {
+    /** no need to trigger page cleaner */
+    PAGE_FLUSH_NO= 0,
+    /** asynchronous flushing is needed */
+    PAGE_FLUSH_ASYNC,
+    /** furious flushing is needed */
+    PAGE_FLUSH_SYNC
+  };
+
 private:
   /** Log a write of a byte string to a page.
   @param block   buffer page
@@ -621,7 +632,7 @@ struct mtr_t {
   /** Append the redo log records to the redo log buffer.
   @param len   number of bytes to write
   @return {start_lsn,flush_ahead} */
-  inline std::pair<lsn_t,bool> finish_write(ulint len);
+  inline std::pair<lsn_t,page_flush_ahead> finish_write(ulint len);
 
   /** Release the resources */
   inline void release_resources();
diff --git a/storage/innobase/mtr/mtr0mtr.cc b/storage/innobase/mtr/mtr0mtr.cc
index f7bde06544f..f6787962cf9 100644
--- a/storage/innobase/mtr/mtr0mtr.cc
+++ b/storage/innobase/mtr/mtr0mtr.cc
@@ -402,12 +402,12 @@ void mtr_t::commit()
   {
     ut_ad(!srv_read_only_mode || m_log_mode == MTR_LOG_NO_REDO);
 
-    std::pair<lsn_t,bool> lsns;
+    std::pair<lsn_t,page_flush_ahead> lsns;
 
     if (const ulint len= prepare_write())
       lsns= finish_write(len);
     else
-      lsns= { m_commit_lsn, false };
+      lsns= { m_commit_lsn, PAGE_FLUSH_NO };
 
     if (m_made_dirty)
       mysql_mutex_lock(&log_sys.flush_order_mutex);
@@ -447,8 +447,8 @@ void mtr_t::commit()
 
     m_memo.for_each_block_in_reverse(CIterate<ReleaseLatches>());
 
-    if (lsns.second)
-      buf_flush_ahead(m_commit_lsn);
+    if (UNIV_UNLIKELY(lsns.second != PAGE_FLUSH_NO))
+      buf_flush_ahead(m_commit_lsn, lsns.second == PAGE_FLUSH_SYNC);
 
     if (m_made_dirty)
       srv_stats.log_write_requests.inc();
@@ -754,7 +754,7 @@ static void log_write_low(const void *str, size_t size)
 
 /** Close the log at mini-transaction commit.
 @return whether buffer pool flushing is needed */
-static bool log_close(lsn_t lsn)
+static mtr_t::page_flush_ahead log_close(lsn_t lsn)
 {
   mysql_mutex_assert_owner(&log_sys.mutex);
   ut_ad(lsn == log_sys.get_lsn());
@@ -790,11 +790,13 @@ static bool log_close(lsn_t lsn)
 		  << log_sys.log_capacity << ".";
     }
   }
+  else if (UNIV_LIKELY(checkpoint_age <= log_sys.max_modified_age_async))
+    return mtr_t::PAGE_FLUSH_NO;
   else if (UNIV_LIKELY(checkpoint_age <= log_sys.max_checkpoint_age))
-    return false;
+    return mtr_t::PAGE_FLUSH_ASYNC;
 
   log_sys.set_check_flush_or_checkpoint();
-  return true;
+  return mtr_t::PAGE_FLUSH_SYNC;
 }
 
 /** Write the block contents to the REDO log */
@@ -858,8 +860,8 @@ inline ulint mtr_t::prepare_write()
 
 /** Append the redo log records to the redo log buffer.
 @param len   number of bytes to write
-@return {start_lsn,flush_ahead_lsn} */
-inline std::pair<lsn_t,bool> mtr_t::finish_write(ulint len)
+@return {start_lsn,flush_ahead} */
+inline std::pair<lsn_t,mtr_t::page_flush_ahead> mtr_t::finish_write(ulint len)
 {
 	ut_ad(m_log_mode == MTR_LOG_ALL);
 	mysql_mutex_assert_owner(&log_sys.mutex);
@@ -875,19 +877,19 @@ inline std::pair<lsn_t,bool> mtr_t::finish_write(ulint len)
 		m_commit_lsn = log_reserve_and_write_fast(front->begin(), len,
 							  &start_lsn);
 
-		if (m_commit_lsn) {
-			return std::make_pair(start_lsn, false);
+		if (!m_commit_lsn) {
+			goto piecewise;
 		}
+	} else {
+piecewise:
+		/* Open the database log for log_write_low */
+		start_lsn = log_reserve_and_open(len);
+		mtr_write_log write_log;
+		m_log.for_each_block(write_log);
+		m_commit_lsn = log_sys.get_lsn();
 	}
-
-	/* Open the database log for log_write_low */
-	start_lsn = log_reserve_and_open(len);
-
-	mtr_write_log write_log;
-	m_log.for_each_block(write_log);
-	m_commit_lsn = log_sys.get_lsn();
-	bool flush = log_close(m_commit_lsn);
-	DBUG_EXECUTE_IF("ib_log_flush_ahead", flush=true;);
+	page_flush_ahead flush= log_close(m_commit_lsn);
+	DBUG_EXECUTE_IF("ib_log_flush_ahead", flush = PAGE_FLUSH_SYNC;);
 
 	return std::make_pair(start_lsn, flush);
 }
