From 1b4ae1ba31d5b540b93609222d87edcf98fefaf5 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Marko=20M=C3=A4kel=C3=A4?= <marko.makela@mariadb.com>
Date: Tue, 4 Jul 2017 14:14:11 +0300
Subject: [PATCH] Mariabackup: Write a dummy empty redo log after --prepare

If --prepare does not create a redo log file, there is a potential
problem: Users could copy the prepared files to a data directory
without deleting pre-existing redo log files. It is safer to create
a dummy redo log file. We cannot use the redo log resizing mechanism
for that, because restoring incremental backups would be broken if
the LSN was advanced if --prepare is invoked on this dummy log file.

InnoDB startup will detect this special dummy file at server startup
and treat it as if no redo log file was present.
---
 extra/mariabackup/xtrabackup.cc                    | 100 ++++++++++++++++++++-
 .../mariabackup/xb_file_key_management.result      |   2 +
 .../suite/mariabackup/xb_file_key_management.test  |  12 ++-
 storage/innobase/log/log0recv.cc                   |  41 ++++++++-
 4 files changed, 152 insertions(+), 3 deletions(-)

diff --git a/extra/mariabackup/xtrabackup.cc b/extra/mariabackup/xtrabackup.cc
index 6d5b3055ea6..805e1f861cb 100644
--- a/extra/mariabackup/xtrabackup.cc
+++ b/extra/mariabackup/xtrabackup.cc
@@ -3362,6 +3362,9 @@ xb_set_max_open_files(
 #endif
 }
 
+/** buffer for generating redo log file */
+static byte MY_ALIGNED(OS_FILE_LOG_BLOCK_SIZE) log_hdr[OS_FILE_LOG_BLOCK_SIZE];
+
 /** Implement --backup
 @return	whether the operation succeeded */
 static
@@ -3637,7 +3640,6 @@ xtrabackup_backup_func()
 	}
 
 	/* label it */
-	byte MY_ALIGNED(OS_FILE_LOG_BLOCK_SIZE) log_hdr[OS_FILE_LOG_BLOCK_SIZE];
 	memset(log_hdr, 0, sizeof log_hdr);
 	mach_write_to_4(LOG_HEADER_FORMAT + log_hdr, log_sys->log.format);
 	mach_write_to_8(LOG_HEADER_START_LSN + log_hdr, checkpoint_lsn_start);
@@ -5300,6 +5302,102 @@ xtrabackup_prepare_func(char** argv)
 	if (ok) {
 		char	filename[FN_REFLEN];
 
+		snprintf(filename, sizeof filename, "%s/ib_logfile0",
+			 xtrabackup_target_dir);
+
+		if (xtrabackup_export) {
+			/* --export will already have created redo log files */
+		} else if (FILE* f = fopen(filename, "wb")) {
+			/* Create a dummy redo log file, so that if files are
+			copied without using copy-back, existing redo
+			log file will be replaced. */
+			memset(log_hdr, 0, sizeof log_hdr);
+			mach_write_to_4(LOG_HEADER_FORMAT + log_hdr,
+					LOG_HEADER_FORMAT_CURRENT);
+			mach_write_to_8(LOG_HEADER_START_LSN + log_hdr,
+					srv_start_lsn);
+			strcpy(reinterpret_cast<char*>(LOG_HEADER_CREATOR
+						       + log_hdr),
+			       "Prepare " MYSQL_SERVER_VERSION);
+			log_block_set_checksum(
+				log_hdr,
+				log_block_calc_checksum_crc32(log_hdr));
+			ok = fwrite(log_hdr, 1, sizeof log_hdr, f)
+				== sizeof log_hdr;
+			memset(log_hdr, 0, sizeof log_hdr);
+			mach_write_to_8(log_hdr + LOG_CHECKPOINT_LSN,
+					srv_start_lsn);
+			const unsigned payload_offset = unsigned(srv_start_lsn)
+				& (OS_FILE_LOG_BLOCK_SIZE - 1);
+			mach_write_to_8(log_hdr + LOG_CHECKPOINT_OFFSET,
+					payload_offset | LOG_FILE_HDR_SIZE);
+			mach_write_to_8(log_hdr + LOG_CHECKPOINT_END_LSN,
+					srv_start_lsn);
+			log_block_set_checksum(
+				log_hdr,
+				log_block_calc_checksum_crc32(log_hdr));
+			/* checkpoint page 1 */
+			ok = ok && fwrite(log_hdr, 1, sizeof log_hdr, f)
+				== sizeof log_hdr;
+			memset(log_hdr, 0, sizeof log_hdr);
+			/* two empty pages before payload */
+			ok = ok && fwrite(log_hdr, 1, sizeof log_hdr, f)
+				== sizeof log_hdr
+				&& fwrite(log_hdr, 1, sizeof log_hdr, f)
+				== sizeof log_hdr;
+			const uint block_n = log_block_convert_lsn_to_no(
+				srv_start_lsn);
+			mach_write_to_4(log_hdr + LOG_BLOCK_HDR_NO, block_n);
+			mach_write_to_2(log_hdr + LOG_BLOCK_HDR_DATA_LEN,
+					OS_FILE_LOG_BLOCK_SIZE);
+			mach_write_to_2(log_hdr + LOG_BLOCK_FIRST_REC_GROUP,
+					payload_offset);
+			/* Pad the first block with dummy records, and
+			write the MLOG_CHECKPOINT to the second block. */
+			memset(log_hdr + LOG_BLOCK_HDR_SIZE,
+			       MLOG_DUMMY_RECORD,
+			       OS_FILE_LOG_BLOCK_SIZE
+			       - LOG_BLOCK_TRL_SIZE
+			       - LOG_BLOCK_HDR_SIZE);
+			log_block_set_checksum(
+				log_hdr,
+				log_block_calc_checksum_crc32(log_hdr));
+			ok = ok && fwrite(log_hdr, 1, sizeof log_hdr, f)
+				== sizeof log_hdr;
+
+			mach_write_to_4(log_hdr + LOG_BLOCK_HDR_NO,
+					(block_n + 1) & 0x3fffffff);
+			mach_write_to_2(log_hdr + LOG_BLOCK_HDR_DATA_LEN,
+					LOG_BLOCK_HDR_SIZE
+					+ SIZE_OF_MLOG_CHECKPOINT);
+			mach_write_to_2(log_hdr + LOG_BLOCK_FIRST_REC_GROUP,
+					LOG_BLOCK_HDR_SIZE);
+			/* Write the payload (a MLOG_CHECKPOINT record). */
+			log_hdr[LOG_BLOCK_HDR_SIZE] = MLOG_CHECKPOINT;
+			mach_write_to_8(LOG_BLOCK_HDR_SIZE + 1 + log_hdr,
+					srv_start_lsn);
+			memset(LOG_BLOCK_HDR_SIZE + SIZE_OF_MLOG_CHECKPOINT
+			       + log_hdr, 0,
+			       OS_FILE_LOG_BLOCK_SIZE
+			       - LOG_BLOCK_TRL_SIZE
+			       - LOG_BLOCK_HDR_SIZE
+			       - SIZE_OF_MLOG_CHECKPOINT);
+			log_block_set_checksum(
+				log_hdr,
+				log_block_calc_checksum_crc32(log_hdr));
+			/* payload page */
+			ok = ok && fwrite(log_hdr, 1, sizeof log_hdr, f)
+				== sizeof log_hdr;
+			fclose(f);
+		} else {
+			ok = false;
+		}
+
+		if (!ok) {
+			msg("xtrabackup: cannot initialize empty file %s\n",
+			    filename);
+		}
+
 		strcpy(metadata_type, "log-applied");
 
 		if(xtrabackup_incremental
diff --git a/mysql-test/suite/mariabackup/xb_file_key_management.result b/mysql-test/suite/mariabackup/xb_file_key_management.result
index 8972da32f8b..28a9531ce2a 100644
--- a/mysql-test/suite/mariabackup/xb_file_key_management.result
+++ b/mysql-test/suite/mariabackup/xb_file_key_management.result
@@ -9,6 +9,8 @@ INSERT INTO t VALUES('foobar2');
 # remove datadir
 # xtrabackup move back
 # restart server
+NOT FOUND /foobar1/ in ib_logfile0
+# expect NOT FOUND
 SELECT * FROM t;
 c
 foobar1
diff --git a/mysql-test/suite/mariabackup/xb_file_key_management.test b/mysql-test/suite/mariabackup/xb_file_key_management.test
index 87631404ba5..a37551f7d6d 100644
--- a/mysql-test/suite/mariabackup/xb_file_key_management.test
+++ b/mysql-test/suite/mariabackup/xb_file_key_management.test
@@ -22,8 +22,18 @@ exec $XTRABACKUP  --prepare --target-dir=$targetdir;
 -- source include/restart_and_restore.inc
 --enable_result_log
 
---list_files $targetdir ib_logfile*
+#
+# Recheck that plain text data (
+# in not in the log, after prepare
+# (MDEV-11538)
+
+--let SEARCH_RANGE = 10000000
+--let SEARCH_PATTERN=foobar1
+--let SEARCH_FILE=$targetdir/ib_logfile0
+--source include/search_pattern_in_file.inc
+--echo # expect NOT FOUND
 
 SELECT * FROM t;
 DROP TABLE t;
 rmdir $targetdir;
+
diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc
index edf04df4e1f..36158954946 100644
--- a/storage/innobase/log/log0recv.cc
+++ b/storage/innobase/log/log0recv.cc
@@ -2794,6 +2794,44 @@ recv_scan_log_recs(
 			break;
 		}
 
+		if (data_len == OS_FILE_LOG_BLOCK_SIZE
+		    && end_lsn == start_lsn + 2 * OS_FILE_LOG_BLOCK_SIZE
+		    && log_block_get_data_len(log_block
+					      + OS_FILE_LOG_BLOCK_SIZE)
+		    == LOG_BLOCK_HDR_SIZE + SIZE_OF_MLOG_CHECKPOINT
+		    && checkpoint_lsn >= (scanned_lsn - data_len
+					  + LOG_BLOCK_HDR_SIZE)
+		    && checkpoint_lsn <= scanned_lsn - LOG_BLOCK_TRL_SIZE
+		    && log_block[OS_FILE_LOG_BLOCK_SIZE + LOG_BLOCK_HDR_SIZE]
+		    == MLOG_CHECKPOINT
+		    && checkpoint_lsn == mach_read_from_8(
+			    OS_FILE_LOG_BLOCK_SIZE + LOG_BLOCK_HDR_SIZE + 1
+			    + log_block)) {
+			/* If the first block consists of
+			MLOG_DUMMY_RECORD only, this should be a dummy
+			redo log file written by Mariabackup --prepare. */
+			const byte* b;
+			const byte* const end = log_block
+				+ (OS_FILE_LOG_BLOCK_SIZE
+				   - LOG_BLOCK_TRL_SIZE);
+			for (b = log_block + LOG_BLOCK_HDR_SIZE;
+			     b < end && *b == MLOG_DUMMY_RECORD;
+			     b++);
+			if (b == end) {
+				/* The redo log is logically empty. */
+				ut_ad(recv_sys->mlog_checkpoint_lsn == 0
+				      || recv_sys->mlog_checkpoint_lsn
+				      == checkpoint_lsn);
+				recv_sys->mlog_checkpoint_lsn = checkpoint_lsn;
+				DBUG_PRINT("ib_log", ("found dummy log; LSN="
+						      LSN_PF, scanned_lsn));
+				*contiguous_lsn = scanned_lsn = checkpoint_lsn
+					+ SIZE_OF_MLOG_CHECKPOINT;
+				finished = true;
+				break;
+			}
+		}
+
 		if (scanned_lsn > recv_sys->scanned_lsn) {
 			ut_ad(!srv_log_files_created);
 			if (!recv_needed_recovery) {
@@ -3244,7 +3282,8 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
 	there is something wrong we will print a message to the
 	user about recovery: */
 
-	if (flush_lsn == checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT
+	if ((flush_lsn == checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT
+	     || contiguous_lsn == checkpoint_lsn + SIZE_OF_MLOG_CHECKPOINT)
 	    && recv_sys->mlog_checkpoint_lsn == checkpoint_lsn) {
 		/* The redo log is logically empty. */
 	} else if (checkpoint_lsn != flush_lsn) {
-- 
2.13.3

