From 3f386559e47b6932e3a2e4d72780c5c36d42cbf6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Marko=20M=C3=A4kel=C3=A4?= <marko.makela@mariadb.com>
Date: Mon, 4 Nov 2019 17:13:26 +0200
Subject: [PATCH] WIP: Remove recv_sys.buf

FIXME: Remove MEM_HEAP_FOR_RECV_SYS
---
 storage/innobase/buf/buf0buf.cc     |   5 -
 storage/innobase/include/log0recv.h |  28 ++----
 storage/innobase/include/mem0mem.h  |   1 -
 storage/innobase/log/log0recv.cc    | 137 +++-------------------------
 4 files changed, 24 insertions(+), 147 deletions(-)

diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc
index 41d242a9360..0dd3324a11b 100644
--- a/storage/innobase/buf/buf0buf.cc
+++ b/storage/innobase/buf/buf0buf.cc
@@ -1301,11 +1301,6 @@ buf_madvise_do_dump()
 			      srv_log_buffer_size * 2,
 			      MADV_DODUMP);
 	}
-	/* mirrors recv_sys_t::create() */
-	if (recv_sys.buf)
-	{
-		ret+= madvise(recv_sys.buf, recv_sys.len, MADV_DODUMP);
-	}
 
 	buf_pool_mutex_enter_all();
 
diff --git a/storage/innobase/include/log0recv.h b/storage/innobase/include/log0recv.h
index 1ba916d3315..8126d293a8d 100644
--- a/storage/innobase/include/log0recv.h
+++ b/storage/innobase/include/log0recv.h
@@ -136,15 +136,17 @@ extern void (*log_file_op)(ulint space_id, const byte* flags,
 /** Stored redo log record */
 struct log_rec_t
 {
-  log_rec_t(lsn_t lsn) : next(NULL), lsn(lsn) {}
+  log_rec_t(lsn_t lsn, const byte* recs) : lsn(lsn), recs(recs) {}
   log_rec_t()= delete;
   log_rec_t(const log_rec_t&)= delete;
   log_rec_t &operator=(const log_rec_t&)= delete;
 
   /** next record */
-  log_rec_t *next;
+  log_rec_t *next= NULL;
   /** mtr_t::commit_lsn() of the mini-transaction */
   const lsn_t lsn;
+  /** the log records */
+  const byte* recs;
 };
 
 struct recv_dblwr_t {
@@ -248,9 +250,10 @@ struct recv_sys_t{
 	bool		apply_log_recs;
 	/** whether recv_apply_hashed_log_recs() is running */
 	bool		apply_batch_on;
-	byte*		buf;	/*!< buffer for parsing log records */
-	size_t		buf_size;	/*!< size of buf */
-	ulint		len;	/*!< amount of data in buf */
+	/** amount of data in log_sys.buf */
+	ulint		len;
+	/** start offset of not-yet-parsed log in log_sys.buf */
+	ulint		recovered_offset;
 	lsn_t		parse_start_lsn;
 				/*!< this is the lsn from which we were able to
 				start parsing log records and adding them to
@@ -262,9 +265,6 @@ struct recv_sys_t{
 	ulint		scanned_checkpoint_no;
 				/*!< the log data has been scanned up to this
 				checkpoint number (lowest 4 bytes) */
-	ulint		recovered_offset;
-				/*!< start offset of non-parsed log records in
-				buf */
 	lsn_t		recovered_lsn;
 				/*!< the log records have been parsed up to
 				this lsn */
@@ -281,8 +281,6 @@ struct recv_sys_t{
 				record, or 0 if none was parsed */
 	/** the time when progress was last reported */
 	time_t		progress_time;
-	mem_heap_t*	heap;	/*!< memory heap of log records and file
-				addresses*/
 
 	using map = std::map<const page_id_t, page_recv_t,
 			     std::less<const page_id_t>,
@@ -320,7 +318,7 @@ struct recv_sys_t{
 	/** Clean up after create() */
 	void close();
 
-	bool is_initialised() const { return buf_size != 0; }
+	bool is_initialised() const { return progress_time != 0; }
 
 	/** Store a redo log record for applying.
 	@param type	record type
@@ -381,14 +379,6 @@ number (FIL_PAGE_LSN) is in the future.  Initially FALSE, and set by
 recv_recovery_from_checkpoint_start(). */
 extern bool		recv_lsn_checks_on;
 
-/** Size of the parsing buffer; it must accommodate RECV_SCAN_SIZE many
-times! */
-#define RECV_PARSING_BUF_SIZE	(2U << 20)
-
-/** Size of block reads when the log groups are scanned forward to do a
-roll-forward */
-#define RECV_SCAN_SIZE		(4U << srv_page_size_shift)
-
 /** This many frames must be left free in the buffer pool when we scan
 the log and store the scanned log records in the buffer pool: we will
 use these free frames to read in pages when we start applying the
diff --git a/storage/innobase/include/mem0mem.h b/storage/innobase/include/mem0mem.h
index 6d0f95cba19..78b8db83cd5 100644
--- a/storage/innobase/include/mem0mem.h
+++ b/storage/innobase/include/mem0mem.h
@@ -59,7 +59,6 @@ buffer pool; the latter method is used for very big heaps */
 /** Different type of heaps in terms of which datastructure is using them */
 #define MEM_HEAP_FOR_BTR_SEARCH		(MEM_HEAP_BTR_SEARCH | MEM_HEAP_BUFFER)
 #define MEM_HEAP_FOR_PAGE_HASH		(MEM_HEAP_DYNAMIC)
-#define MEM_HEAP_FOR_RECV_SYS		(MEM_HEAP_BUFFER)
 #define MEM_HEAP_FOR_LOCK_HEAP		(MEM_HEAP_BUFFER)
 
 /** The following start size is used for the first block in the memory heap if
diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc
index 74dfbea1af3..11f1a7030e4 100644
--- a/storage/innobase/log/log0recv.cc
+++ b/storage/innobase/log/log0recv.cc
@@ -56,10 +56,6 @@ Created 9/20/1997 Heikki Tuuri
 #include "srv0srv.h"
 #include "srv0start.h"
 
-/** Log records are stored in the hash table in chunks at most of this size;
-this must be less than srv_page_size as it is stored in the buffer pool */
-#define RECV_DATA_BLOCK_SIZE	(MEM_MAX_ALLOC_IN_BUF - sizeof(recv_t::data_t) - REDZONE_SIZE)
-
 /** Read-ahead area in applying log records to file pages */
 #define RECV_READ_AHEAD_AREA	32U
 
@@ -131,9 +127,11 @@ struct recv_t : public log_rec_t
     @param type      redo log record chunk
     @param start_lsn start LSN of the mini-transaction
     @param end_lsn   end LSN of the mini-transaction
+    @param recs      redo log records
   */
-  recv_t(uint32_t len, mlog_id_t type, lsn_t start_lsn, lsn_t end_lsn) :
-    log_rec_t(end_lsn), len(len), type(type), start_lsn(start_lsn), data(NULL)
+  recv_t(uint32_t len, mlog_id_t type, lsn_t start_lsn, lsn_t end_lsn,
+         const byte* recs) :
+    log_rec_t(end_lsn, recs), len(len), type(type), start_lsn(start_lsn)
   {}
 
   /** log record body length in bytes */
@@ -142,31 +140,6 @@ struct recv_t : public log_rec_t
   const mlog_id_t type;
   /** start LSN of the mini-transaction (not necessarily of this record) */
   const lsn_t start_lsn;
-  /** log record */
-  struct data_t
-  {
-    /** pointer to the next chunk, or NULL for the last chunk.  The
-    log record data (at most RECV_DATA_BLOCK_SIZE bytes per chunk)
-    is stored immediately after this field. */
-    data_t *next= NULL;
-
-    data_t() {}
-    /**
-      Constructor.
-      @param chunk   redo log record chunk
-      @param len     length of the chunk, in bytes
-    */
-    data_t(const void* chunk, size_t len)
-    {
-      memcpy(reinterpret_cast<void*>(this + 1), chunk, len);
-    }
-
-    /**
-      Append a log snippet.
-      @param d  log snippet
-    */
-    void append(data_t *d) { ut_ad(!next); ut_ad(!d->next); next= d; }
-  }* data;
 };
 
 
@@ -698,11 +671,6 @@ void recv_sys_t::close()
 		dblwr.pages.clear();
 		pages.clear();
 
-		if (heap) {
-			mem_heap_free(heap);
-			heap = NULL;
-		}
-
 		if (flush_start) {
 			os_event_destroy(flush_start);
 		}
@@ -711,12 +679,6 @@ void recv_sys_t::close()
 			os_event_destroy(flush_end);
 		}
 
-		if (buf) {
-			ut_free_dodump(buf, buf_size);
-			buf = NULL;
-		}
-
-		buf_size = 0;
 		mutex_free(&writer_mutex);
 		mutex_free(&mutex);
 	}
@@ -810,8 +772,6 @@ void recv_sys_t::create()
 	mutex_create(LATCH_ID_RECV_SYS, &mutex);
 	mutex_create(LATCH_ID_RECV_WRITER, &writer_mutex);
 
-	heap = mem_heap_create_typed(256, MEM_HEAP_FOR_RECV_SYS);
-
 	if (!srv_read_only_mode) {
 		flush_start = os_event_create(0);
 		flush_end = os_event_create(0);
@@ -828,8 +788,6 @@ void recv_sys_t::create()
 		recv_n_pool_free_frames = 512;
 	}
 
-	buf = static_cast<byte*>(ut_malloc_dontdump(RECV_PARSING_BUF_SIZE));
-	buf_size = RECV_PARSING_BUF_SIZE;
 	len = 0;
 	parse_start_lsn = 0;
 	scanned_lsn = 0;
@@ -852,7 +810,6 @@ inline void recv_sys_t::clear()
 {
 	ut_ad(mutex_own(&mutex));
 	pages.clear();
-	mem_heap_empty(heap);
 }
 
 /** Free most recovery data structures. */
@@ -863,11 +820,8 @@ void recv_sys_t::debug_free()
 	mutex_enter(&mutex);
 
 	pages.clear();
-	mem_heap_free(heap);
-	ut_free_dodump(buf, buf_size);
 
-	buf = NULL;
-	heap = NULL;
+	progress_time = 0;
 
 	/* wake page cleaner up to progress */
 	if (!srv_read_only_mode) {
@@ -1768,30 +1722,12 @@ inline void recv_sys_t::add(mlog_id_t type, const page_id_t page_id,
     break;
   }
 
-  /* Store the log record body in limited-size chunks, because the
-  heap grows into the buffer pool. */
   uint32_t len= uint32_t(rec_end - body);
   const uint32_t chunk_limit= static_cast<uint32_t>(RECV_DATA_BLOCK_SIZE);
 
-  recv_t *recv= new (mem_heap_alloc(heap, sizeof(recv_t)))
-    recv_t(len, type, lsn, end_lsn);
+  recv_t* recv= new (mem_heap_alloc(heap, sizeof(recv_t)))
+    recv_t(len, type, lsn, end_lsn, body);
   recs.log.append(recv);
-
-  for (recv_t::data_t *prev= NULL;;) {
-    const uint32_t l= std::min(len, chunk_limit);
-    recv_t::data_t *d= new (mem_heap_alloc(heap, sizeof(recv_t::data_t) + l))
-      recv_t::data_t(body, l);
-    if (prev)
-      prev->append(d);
-    else
-      recv->data= d;
-    prev= d;
-
-    body+= l;
-    len-= l;
-    if (!len)
-      break;
-  }
 }
 
 /** Trim old log records for a page
@@ -1821,28 +1757,6 @@ inline void page_recv_t::will_not_read()
 }
 
 
-/*********************************************************************//**
-Copies the log record body from recv to buf. */
-static ATTRIBUTE_COLD
-void
-recv_data_copy_to_buf(
-/*==================*/
-	byte*	buf,	/*!< in: buffer of length at least recv->len */
-	const recv_t& recv)	/*!< in: log record */
-{
-	const recv_t::data_t* recv_data = recv.data;
-	ulint len = recv.len;
-	const ulint chunk_limit = static_cast<ulint>(RECV_DATA_BLOCK_SIZE);
-
-	do {
-		const ulint l = std::min(len, chunk_limit);
-		memcpy(buf, reinterpret_cast<const byte*>(recv_data + 1), l);
-		recv_data = recv_data->next;
-		buf += l;
-		len -= l;
-	} while (len);
-}
-
 /** Apply the hashed log records to the page, if the page lsn is less than the
 lsn of a log record.
 @param[in,out]	block		buffer pool page
@@ -1888,7 +1802,6 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
 	lsn_t start_lsn = 0, end_lsn = 0;
 	ut_d(lsn_t recv_start_lsn = 0);
 	const lsn_t init_lsn = init ? init->lsn : 0;
-	const ulint chunk_limit = static_cast<ulint>(RECV_DATA_BLOCK_SIZE);
 
 	for (const log_rec_t* l : p->second.log) {
 		const recv_t* recv = static_cast<const recv_t*>(l);
@@ -1932,23 +1845,8 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
 				 << " len " << recv->len
 				 << " page " << block->page.id);
 
-			byte* buf;
-			const byte* recs;
-
-			if (UNIV_UNLIKELY(recv->len > chunk_limit)) {
-				/* We have to copy the record body to
-				a separate buffer */
-				recs = buf = static_cast<byte*>
-					(ut_malloc_nokey(recv->len));
-				recv_data_copy_to_buf(buf, *recv);
-			} else {
-				buf = NULL;
-				recs = reinterpret_cast<const byte*>
-					(recv->data + 1);
-			}
-
 			recv_parse_or_apply_log_rec_body(
-				recv->type, recs, recs + recv->len,
+				recv->type, recv->recs, recv->recs + recv->len,
 				block->page.id.space(),
 				block->page.id.page_no(), true, block, &mtr);
 
@@ -1962,8 +1860,6 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
 				mach_write_to_8(FIL_PAGE_LSN + page_zip->data,
 						end_lsn);
 			}
-
-			ut_free(buf);
 		}
 	}
 
@@ -2470,7 +2366,7 @@ recv_report_corrupt_log(
 	ib::error() <<
 		"############### CORRUPT LOG RECORD FOUND ##################";
 
-	const ulint ptr_offset = ulint(ptr - recv_sys.buf);
+	const ulint ptr_offset = ulint(ptr - log_sys.buf);
 
 	ib::info() << "Log record type " << type << ", page " << space << ":"
 		<< page_no << ". Log parsing proceeded successfully up to "
@@ -2480,7 +2376,7 @@ recv_report_corrupt_log(
 		<< ptr_offset << ", prev "
 		<< recv_previous_parsed_rec_offset;
 
-	ut_ad(ptr <= recv_sys.buf + recv_sys.len);
+	ut_ad(ptr <= log_sys.buf + recv_sys.len);
 
 	const ulint	limit	= 100;
 	const ulint	prev_offset = std::min(recv_previous_parsed_rec_offset,
@@ -2491,7 +2387,7 @@ recv_report_corrupt_log(
 	ib::info() << "Hex dump starting " << before << " bytes before and"
 		" ending " << after << " bytes after the corrupted record:";
 
-	const byte* start = recv_sys.buf + prev_offset - before;
+	const byte* start = log_sys.buf + prev_offset - before;
 
 	ut_print_buf(stderr, start, ulint(ptr - start) + after);
 	putc('\n', stderr);
@@ -2548,8 +2444,8 @@ bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t store, bool apply)
 	ut_ad(mutex_own(&recv_sys.mutex));
 	ut_ad(recv_sys.parse_start_lsn != 0);
 loop:
-	const byte* ptr = recv_sys.buf + recv_sys.recovered_offset;
-	const byte* end_ptr = recv_sys.buf + recv_sys.len;
+	const byte* ptr = log_sys.buf + recv_sys.recovered_offset;
+	const byte* end_ptr = log_sys.buf + recv_sys.len;
 
 	if (ptr == end_ptr) {
 
@@ -2787,7 +2683,7 @@ bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t store, bool apply)
 
 		/* Add all the records to the hash table */
 
-		ptr = recv_sys.buf + recv_sys.recovered_offset;
+		ptr = log_sys.buf + recv_sys.recovered_offset;
 
 		for (;;) {
 			old_lsn = recv_sys.recovered_lsn;
@@ -2922,7 +2818,7 @@ bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn)
 	ut_ad(start_offset <= end_offset);
 
 	if (start_offset < end_offset) {
-		memcpy(recv_sys.buf + recv_sys.len,
+		memcpy(log_sys.buf + recv_sys.len,
 		       log_block + start_offset, end_offset - start_offset);
 
 		recv_sys.len += end_offset - start_offset;
@@ -2973,7 +2869,6 @@ recv_scan_log_recs(
 	ulint		data_len;
 	bool		more_data	= false;
 	bool		apply		= recv_sys.mlog_checkpoint_lsn != 0;
-	ulint		recv_parsing_buf_size = RECV_PARSING_BUF_SIZE;
 
 	ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
 	ut_ad(end_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
@@ -3441,8 +3336,6 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
 	be contiguously written. */
 	recv_sys.mlog_checkpoint_lsn = 0;
 
-	ut_ad(RECV_SCAN_SIZE <= srv_log_buffer_size);
-
 	const lsn_t	end_lsn = mach_read_from_8(
 		buf + LOG_CHECKPOINT_END_LSN);
 
-- 
2.24.0.rc1

