diff --git a/mysql-test/include/default_mysqld.cnf b/mysql-test/include/default_mysqld.cnf
index 14f120e..6276d41 100644
--- a/mysql-test/include/default_mysqld.cnf
+++ b/mysql-test/include/default_mysqld.cnf
@@ -125,6 +125,7 @@ loose-innodb-encryption-threads=4
 # with example_key_management_plugin key-age is measured in seconds...
 loose-innodb-encryption-rotate-key-age=15
 loose-innodb-encrypt-tables=ON
+loose-innodb-encrypt-log=ON
 
 # Set small value of min_gap_size to do a sufficient testing of the related
 # functionality. The flag is marked "loose" because it doesn't exist in
diff --git a/mysql-test/suite/innodb/r/innodb_encryption.result b/mysql-test/suite/innodb/r/innodb_encryption.result
index 040cbd2..283c78e 100644
--- a/mysql-test/suite/innodb/r/innodb_encryption.result
+++ b/mysql-test/suite/innodb/r/innodb_encryption.result
@@ -1,7 +1,7 @@
 SET @start_global_value = @@global.innodb_encryption_threads;
 SHOW VARIABLES LIKE 'innodb_encrypt%';
 Variable_name	Value
-innodb_encrypt_log	OFF
+innodb_encrypt_log	ON
 innodb_encrypt_tables	ON
 innodb_encryption_rotate_key_age	15
 innodb_encryption_rotation_iops	100
@@ -44,7 +44,7 @@ SET GLOBAL innodb_encryption_threads=@start_global_value;
 # Restart mysqld --innodb_encrypt_tables=0 --innodb_encryption_threads=0
 SHOW VARIABLES LIKE 'innodb_encrypt%';
 Variable_name	Value
-innodb_encrypt_log	OFF
+innodb_encrypt_log	ON
 innodb_encrypt_tables	OFF
 innodb_encryption_rotate_key_age	15
 innodb_encryption_rotation_iops	100
diff --git a/mysql-test/suite/sys_vars/r/innodb_encrypt_log_basic.result b/mysql-test/suite/sys_vars/r/innodb_encrypt_log_basic.result
index 4beb1a0..485ba44 100644
--- a/mysql-test/suite/sys_vars/r/innodb_encrypt_log_basic.result
+++ b/mysql-test/suite/sys_vars/r/innodb_encrypt_log_basic.result
@@ -1,13 +1,13 @@
 SELECT @@GLOBAL.innodb_encrypt_log;
 @@GLOBAL.innodb_encrypt_log
-0
+1
 0 Expected
 SET @@GLOBAL.innodb_encrypt_log=1;
 ERROR HY000: Variable 'innodb_encrypt_log' is a read only variable
 Expected error 'Read only variable'
 SELECT @@GLOBAL.innodb_encrypt_log;
 @@GLOBAL.innodb_encrypt_log
-0
+1
 0 Expected
 SELECT IF(@@GLOBAL.innodb_encrypt_log, 'ON', 'OFF') = VARIABLE_VALUE
 FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES
@@ -17,13 +17,13 @@ IF(@@GLOBAL.innodb_encrypt_log, 'ON', 'OFF') = VARIABLE_VALUE
 1 Expected
 SELECT @@GLOBAL.innodb_encrypt_log;
 @@GLOBAL.innodb_encrypt_log
-0
+1
 0 Expected
 SELECT VARIABLE_VALUE
 FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES 
 WHERE VARIABLE_NAME='innodb_encrypt_log';
 VARIABLE_VALUE
-OFF
+ON
 0 Expected
 SELECT @@innodb_encrypt_log = @@GLOBAL.innodb_encrypt_log;
 @@innodb_encrypt_log = @@GLOBAL.innodb_encrypt_log
@@ -31,7 +31,7 @@ SELECT @@innodb_encrypt_log = @@GLOBAL.innodb_encrypt_log;
 1 Expected
 SELECT @@innodb_encrypt_log;
 @@innodb_encrypt_log
-0
+1
 0 Expected
 SELECT COUNT(@@local.innodb_encrypt_log);
 ERROR HY000: Variable 'innodb_encrypt_log' is a GLOBAL variable
@@ -41,7 +41,7 @@ ERROR HY000: Variable 'innodb_encrypt_log' is a GLOBAL variable
 Expected error 'Variable is a GLOBAL variable'
 SELECT @@GLOBAL.innodb_encrypt_log;
 @@GLOBAL.innodb_encrypt_log
-0
+1
 0 Expected
 SELECT innodb_encrypt_log;
 ERROR 42S22: Unknown column 'innodb_encrypt_log' in 'field list'
diff --git a/storage/innobase/include/log0crypt.h b/storage/innobase/include/log0crypt.h
index caac22f..cd42f25 100644
--- a/storage/innobase/include/log0crypt.h
+++ b/storage/innobase/include/log0crypt.h
@@ -9,68 +9,17 @@ Created 11/25/2013 Minli Zhu
 
 #include "univ.i"
 #include "ut0byte.h"
-#include "ut0lst.h"
-#include "ut0rnd.h"
-#include "my_crypt.h"
-#include "my_crypt_key_management.h"	// for key version and key 
-
-#define PURPOSE_BYTE_LEN	AES_128_BLOCK_SIZE - 1
-#define PURPOSE_BYTE_OFFSET	0
-#define UNENCRYPTED_KEY_VER	0
 
 /* If true, enable redo log encryption. */
 extern my_bool srv_encrypt_log;
-/* Plain text used by AES_ECB to generate redo log crypt key. */
-extern byte redo_log_crypt_msg[AES_128_BLOCK_SIZE];
-/* IV to concatenate with counter used by AES_CTR for redo log crypto. */
-extern byte aes_ctr_nonce[AES_128_BLOCK_SIZE];
-
-/*********************************************************************//**
-Generate a 128-bit random message used to generate redo log crypto key.
-Init AES-CTR iv/nonce with random number.
-It is called only when clean startup (i.e., redo logs do not exist). */
-UNIV_INTERN
-void
-log_init_crypt_msg_and_nonce(void);
-/*===============================*/
-/*********************************************************************//**
-Init log_sys redo log crypto key. */
-UNIV_INTERN
-void
-log_init_crypt_key(
-/*===============*/
-	const byte* crypt_msg,		/*< in: crypt msg */
-	const uint crypt_ver,		/*< in: mysqld key version */
-	byte* crypt_key);		/*< out: crypt struct with key and iv */
-/*********************************************************************//**
-Encrypt log blocks. */
-UNIV_INTERN
-CryptResult
-log_blocks_encrypt(
-/*===============*/
-	const byte* blocks,		/*!< in: blocks before encryption */
-	const ulint size,		/*!< in: size of blocks, must be multiple of a log block */
-	byte* dst_blocks);		/*!< out: blocks after encryption */
 
-/*********************************************************************//**
-Decrypt log blocks. */
-UNIV_INTERN
-CryptResult
-log_blocks_decrypt(
-/*===============*/
-	const byte* blocks,		/*!< in: blocks before decryption */
-	const ulint size,		/*!< in: size of blocks, must be multiple of a log block */
-	byte* dst_blocks);		/*!< out: blocks after decryption */
-
-/*********************************************************************//**
-Set next checkpoint's key version to latest one, and generate current
-key. Key version 0 means no encryption. */
+/***********************************************************************
+Set next checkpoint's key version to latest one, and generate new key */
 UNIV_INTERN
 void
 log_crypt_set_ver_and_key(
 /*======================*/
-	uint& key_ver,			/*!< out: latest key version */
-	byte* crypt_key);		/*!< out: crypto key */
+	ib_uint64_t next_checkpoint_no);
 
 /*********************************************************************//**
 Writes the crypto (version, msg and iv) info, which has been used for
@@ -82,4 +31,34 @@ log_crypt_write_checkpoint_buf(
 /*===========================*/
 	byte*	buf);			/*!< in/out: checkpoint buffer */
 
+/*********************************************************************//**
+Read the crypto (version, msg and iv) info, which has been used for
+log blocks with lsn <= this checkpoint's lsn, from a log header's
+checkpoint buf. */
+UNIV_INTERN
+void
+log_crypt_read_checkpoint_buf(
+/*===========================*/
+	const byte*	buf);			/*!< in: checkpoint buffer */
+
+/********************************************************
+Encrypt one or more log block before it is flushed to disk */
+UNIV_INTERN
+void
+log_encrypt_before_write(
+/*===========================*/
+	ib_uint64_t next_checkpoint_no,	/*!< in: log group to be flushed */
+	byte* block,			/*!< in/out: pointer to a log block */
+	const ulint size);		/*!< in: size of log blocks */
+
+/********************************************************
+Decrypt a specified log segment after they are read from a log file to a buffer.
+*/
+UNIV_INTERN
+void
+log_decrypt_after_read(
+/*==========================*/
+	byte* frame,	/*!< in/out: log segment */
+	const ulint size);	/*!< in: log segment size */
+
 #endif  // log0crypt.h
diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h
index 6d0fb6a..44dc033 100644
--- a/storage/innobase/include/log0log.h
+++ b/storage/innobase/include/log0log.h
@@ -672,19 +672,15 @@ extern log_t*	log_sys;
 #endif
 #define LOG_CHECKPOINT_OFFSET_HIGH32	(16 + LOG_CHECKPOINT_ARRAY_END)
 #define LOG_CRYPT_VER			(20 + LOG_CHECKPOINT_ARRAY_END)
-					/*!< 32-bit key version. Corresponding
-					key has been used for log records with                                        
-					lsn <= the checkpoint' lsn */
-#define LOG_CRYPT_MSG			(24 + LOG_CHECKPOINT_ARRAY_END)
-					/*!< a 128-bit value used to
-					derive cryto key for redo log.
-					It is generated via the concatenation
-					of 1 purpose byte T (0x02) and a
-					15-byte random number.*/
-#define LOG_CRYPT_IV			(40 + LOG_CHECKPOINT_ARRAY_END)
-					/*!< a 128-bit random number used as
-					AES-CTR iv/nonce for redo log */
-#define LOG_CHECKPOINT_SIZE		(56 + LOG_CHECKPOINT_ARRAY_END)
+
+#define LOG_CRYPT_MAX_ENTRIES           (5)
+#define LOG_CRYPT_ENTRY_SIZE            (4 + 4 + 2 * AES_128_BLOCK_SIZE)
+#define LOG_CRYPT_SIZE                  (1 + 1 +			\
+					 (LOG_CRYPT_MAX_ENTRIES *	\
+					  LOG_CRYPT_ENTRY_SIZE))
+
+#define LOG_CHECKPOINT_SIZE		(20 + LOG_CHECKPOINT_ARRAY_END + \
+					 LOG_CRYPT_SIZE)
 
 /* Offsets of a log file header */
 #define LOG_GROUP_ID		0	/* log group number */
@@ -789,10 +785,6 @@ struct log_t{
 	lsn_t		lsn;		/*!< log sequence number */
 	ulint		buf_free;	/*!< first free offset within the log
 					buffer */
-	uint		redo_log_crypt_ver;
-					/*!< 32-bit crypto ver */
-	byte		redo_log_crypt_key[AES_128_BLOCK_SIZE];
-					/*!< crypto key to encrypt redo log */
 #ifndef UNIV_HOTBACKUP
 	ib_mutex_t		mutex;		/*!< mutex protecting the log */
 
diff --git a/storage/innobase/include/log0recv.h b/storage/innobase/include/log0recv.h
index 5ba35ff..cd7e969 100644
--- a/storage/innobase/include/log0recv.h
+++ b/storage/innobase/include/log0recv.h
@@ -442,11 +442,6 @@ struct recv_sys_t{
 				scan find a corrupt log block, or a corrupt
 				log record, or there is a log parsing
 				buffer overflow */
-	uint		recv_log_crypt_ver;
-				/*!< mysqld key version to generate redo
-				log crypt key for recovery */
-	byte		recv_log_crypt_key[AES_128_BLOCK_SIZE];
-				/*!< crypto key to decrypt redo log for recovery */
 #ifdef UNIV_LOG_ARCHIVE
 	log_group_t*	archive_group;
 				/*!< in archive recovery: the log group whose
diff --git a/storage/innobase/log/log0crypt.cc b/storage/innobase/log/log0crypt.cc
index 8517e80..a683459 100644
--- a/storage/innobase/log/log0crypt.cc
+++ b/storage/innobase/log/log0crypt.cc
@@ -9,6 +9,18 @@ Created 11/25/2013 Minli Zhu
 #include "srv0start.h" // for srv_start_lsn
 #include "log0recv.h"  // for recv_sys
 
+#include "my_crypt.h"
+#include "my_crypt_key_management.h"	// for key version and key
+
+#define UNENCRYPTED_KEY_VER	0
+
+/* If true, enable redo log encryption. */
+extern my_bool srv_encrypt_log;
+
+
+#include <algorithm>    // std::sort
+#include <deque>
+
 /* If true, enable redo log encryption. */
 UNIV_INTERN my_bool srv_encrypt_log = FALSE;
 /*
@@ -16,100 +28,21 @@ UNIV_INTERN my_bool srv_encrypt_log = FALSE;
   Set and used to validate crypto msg.
 */
 static const byte redo_log_purpose_byte = 0x02;
-/* Plain text used by AES_ECB to generate redo log crypt key. */
-byte redo_log_crypt_msg[AES_128_BLOCK_SIZE] = {0};
-/* IV to concatenate with counter used by AES_CTR for redo log
- * encryption/decryption. */
-byte aes_ctr_nonce[AES_128_BLOCK_SIZE] = {0};
 
-/*********************************************************************//**
-Generate a 128-bit value used to generate crypt key for redo log.
-It is generated via the concatenation of 1 purpose byte (0x02) and 15-byte
-random number.
-Init AES-CTR iv/nonce with random number.
-It is called when:
-- redo logs do not exist when start up, or
-- transition from without crypto.
-Note:
-We should not use flags and conditions such as:
-	(srv_encrypt_log &&
-	 opt_danger_danger_use_dbug_keys &&
-	 GetLatestCryptoKeyVersion() == UNENCRYPTED_KEY_VER)
-because they haven't been read and set yet in the situation of resetting
-redo logs.
+/*
+  Store this many keys into each checkpoint info
 */
-UNIV_INTERN
-void
-log_init_crypt_msg_and_nonce(void)
-/*==============================*/
-{
-	mach_write_to_1(redo_log_crypt_msg, redo_log_purpose_byte);
-	if (RandomBytes(redo_log_crypt_msg + 1, PURPOSE_BYTE_LEN) != CRYPT_OK)
-	{
-		fprintf(stderr,
-			"\nInnodb redo log crypto: generate "
-			"%u-byte random number as crypto msg failed.\n",
-			PURPOSE_BYTE_LEN);
-		abort();
-	}
-
-	if (RandomBytes(aes_ctr_nonce, AES_128_BLOCK_SIZE) != CRYPT_OK)
-	{
-		fprintf(stderr,
-			"\nInnodb redo log crypto: generate "
-			"%u-byte random number as AES_CTR nonce failed.\n",
-			AES_128_BLOCK_SIZE);
-		abort();
-	}
-}
-
-/*********************************************************************//**
-Generate crypt key from crypt msg. */
-UNIV_INTERN
-void
-log_init_crypt_key(
-/*===============*/
-	const byte* crypt_msg,		/*< in: crypt msg */
-	const uint crypt_ver,		/*< in: key version */
-	byte* key)			/*< out: crypt key*/
-{
-	if (crypt_ver == UNENCRYPTED_KEY_VER)
-	{
-		fprintf(stderr, "\nInnodb redo log crypto: unencrypted key ver.\n\n");
-		memset(key, 0, AES_128_BLOCK_SIZE);
-		return;
-	}
+static const size_t kMaxSavedKeys = LOG_CRYPT_MAX_ENTRIES;
 
-	if (crypt_msg[PURPOSE_BYTE_OFFSET] != redo_log_purpose_byte)
-	{
-		fprintf(stderr,
-			"\nInnodb redo log crypto: msg type mismatched. "
-			"Expected: %x; Actual: %x\n",
-			redo_log_purpose_byte, crypt_msg[PURPOSE_BYTE_OFFSET]);
-		abort();
-	}
+struct crypt_info_t {
+	ulong           checkpoint_no; /*!< checkpoint no */
+	uint		key_version;   /*!< mysqld key version */
+	byte            crypt_msg[AES_128_BLOCK_SIZE];
+	byte		crypt_key[AES_128_BLOCK_SIZE];
+	byte            crypt_nonce[AES_128_BLOCK_SIZE];
+};
 
-	byte mysqld_key[AES_128_BLOCK_SIZE] = {0};
-	if (GetCryptoKey(crypt_ver, mysqld_key, AES_128_BLOCK_SIZE))
-	{
-		fprintf(stderr,
-			"\nInnodb redo log crypto: getting mysqld crypto key "
-			"from key version failed.\n");
-		abort();
-	}
-
-	int dst_len;
-	int rc = EncryptAes128Ecb(mysqld_key, //key
-		crypt_msg, AES_128_BLOCK_SIZE, //src, srclen
-		key, &dst_len); //dst, &dstlen
-	if (rc != CRYPT_OK || dst_len != AES_128_BLOCK_SIZE)
-	{
-		fprintf(stderr,
-			"\nInnodb redo log crypto: getting redo log crypto key "
-			"failed.\n");
-		abort();
-	}
-}
+static std::deque<crypt_info_t> crypt_info;
 
 /*********************************************************************//**
 Get a log block's start lsn.
@@ -120,6 +53,7 @@ log_block_get_start_lsn(
 /*====================*/
 	lsn_t lsn,			/*!< in: checkpoint lsn */
 	ulint log_block_no)		/*!< in: log block number */
+/*==========================*/
 {
 	lsn_t start_lsn =
 		(lsn & 0xffffffff00000000) |
@@ -127,41 +61,79 @@ log_block_get_start_lsn(
 	return start_lsn;
 }
 
+static
+const crypt_info_t*
+get_crypt_info(ib_uint64_t checkpoint_no)
+/*========================================*/
+{
+	/* so that no one is modifying array while we search */
+	ut_ad(mutex_own(&(log_sys->mutex)));
+
+	/* a log block only stores 4-bytes of checkpoint no */
+	checkpoint_no &= 0xFFFFFFFF;
+	for (size_t i = 0; i < crypt_info.size(); i++) {
+		struct crypt_info_t* it = &crypt_info[i];
+
+		if (it->checkpoint_no == checkpoint_no) {
+			return it;
+		}
+	}
+	return NULL;
+}
+
+static
+const crypt_info_t*
+get_crypt_info(const byte* log_block) {
+/*====================================*/
+	ib_uint64_t checkpoint_no = log_block_get_checkpoint_no(log_block);
+	return get_crypt_info(checkpoint_no);
+}
+
 /*********************************************************************//**
 Call AES CTR to encrypt/decrypt log blocks. */
 static
 CryptResult
 log_blocks_crypt(
 /*=============*/
-	const byte* block,		/*!< in: blocks before encrypt/decrypt*/
-	const ulint size,		/*!< in: size of block, must be multiple of a log block*/
-	byte* dst_block,		/*!< out: blocks after encrypt/decrypt */
-	const bool is_encrypt)		/*!< in: encrypt or decrypt*/
+	const byte* block,	/*!< in: blocks before encrypt/decrypt*/
+	ulint size,		/*!< in: size of block */
+	byte* dst_block,	/*!< out: blocks after encrypt/decrypt */
+	bool is_encrypt)	/*!< in: encrypt or decrypt*/
 {
 	byte *log_block = (byte*)block;
 	CryptResult rc = CRYPT_OK;
-	int src_len, dst_len;
+	int dst_len;
 	byte aes_ctr_counter[AES_128_BLOCK_SIZE];
-	ulint log_block_no, log_block_start_lsn;
-	byte *key;
 	ulint lsn;
-	if (is_encrypt)
-	{
-		ut_a(log_sys && log_sys->redo_log_crypt_ver != UNENCRYPTED_KEY_VER);
-		key = (byte *)(log_sys->redo_log_crypt_key);
-		lsn = log_sys->lsn;
 
+	if (is_encrypt) {
+		lsn = log_sys->lsn;
 	} else {
-		ut_a(recv_sys && recv_sys->recv_log_crypt_ver != UNENCRYPTED_KEY_VER);
-		key = (byte *)(recv_sys->recv_log_crypt_key);
 		lsn = srv_start_lsn;
 	}
+
 	ut_a(size % OS_FILE_LOG_BLOCK_SIZE == 0);
-	src_len = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE;
-	for (ulint i = 0; i < size ; i += OS_FILE_LOG_BLOCK_SIZE)
-	{
-		log_block_no = log_block_get_hdr_no(log_block);
-		log_block_start_lsn = log_block_get_start_lsn(lsn, log_block_no);
+	const int src_len = OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_HDR_SIZE;
+	for (ulint i = 0; i < size ; i += OS_FILE_LOG_BLOCK_SIZE) {
+		ulint log_block_no = log_block_get_hdr_no(log_block);
+		ulint log_block_start_lsn = log_block_get_start_lsn(
+			lsn, log_block_no);
+
+		const crypt_info_t* info = get_crypt_info(log_block);
+#ifdef DEBUG_CRYPT
+		fprintf(stderr,
+			"%s %lu chkpt: %lu key: %u lsn: %lu\n",
+			is_encrypt ? "crypt" : "decrypt",
+			log_block_no,
+			log_block_get_checkpoint_no(log_block),
+			info ? info->key_version : 0,
+			log_block_start_lsn);
+#endif
+		if (info == NULL ||
+		    info->key_version == UNENCRYPTED_KEY_VER) {
+			memcpy(dst_block, log_block, OS_FILE_LOG_BLOCK_SIZE);
+			goto next;
+		}
 
 		// Assume log block header is not encrypted
 		memcpy(dst_block, log_block, LOG_BLOCK_HDR_SIZE);
@@ -170,15 +142,28 @@ log_blocks_crypt(
 		// (8-byte) + lbn (4-byte) + abn
 		// (1-byte, only 5 bits are used). "+" means concatenate.
 		bzero(aes_ctr_counter, AES_128_BLOCK_SIZE);
-		memcpy(aes_ctr_counter, &aes_ctr_nonce, 3);
+		memcpy(aes_ctr_counter, info->crypt_nonce, 3);
 		mach_write_to_8(aes_ctr_counter + 3, log_block_start_lsn);
 		mach_write_to_4(aes_ctr_counter + 11, log_block_no);
 		bzero(aes_ctr_counter + 15, 1);
-		rc = EncryptAes128Ctr(key, aes_ctr_counter, AES_128_BLOCK_SIZE, // key, counter, block size
-			log_block + LOG_BLOCK_HDR_SIZE, src_len, // src, src_len
-			dst_block + LOG_BLOCK_HDR_SIZE, &dst_len); // dst, dst_len
+		if (is_encrypt) {
+			rc = EncryptAes128Ctr(
+				info->crypt_key,
+				aes_ctr_counter, AES_128_BLOCK_SIZE,
+				log_block + LOG_BLOCK_HDR_SIZE, src_len,
+				dst_block + LOG_BLOCK_HDR_SIZE, &dst_len);
+		} else {
+			rc = DecryptAes128Ctr(
+				info->crypt_key,
+				aes_ctr_counter, AES_128_BLOCK_SIZE,
+				log_block + LOG_BLOCK_HDR_SIZE, src_len,
+				dst_block + LOG_BLOCK_HDR_SIZE, &dst_len);
+		}
+
 		ut_a(rc == CRYPT_OK);
 		ut_a(dst_len == src_len);
+
+next:
 		log_block += OS_FILE_LOG_BLOCK_SIZE;
 		dst_block += OS_FILE_LOG_BLOCK_SIZE;
 	}
@@ -187,29 +172,68 @@ log_blocks_crypt(
 }
 
 /*********************************************************************//**
-Encrypt log blocks. */
-UNIV_INTERN
-CryptResult
-log_blocks_encrypt(
+Generate crypt key from crypt msg. */
+static
+void
+init_crypt_key(
 /*===============*/
-	const byte* block,		/*!< in: blocks before encryption */
-	const ulint size,		/*!< in: size of blocks, must be multiple of a log block */
-	byte* dst_block)		/*!< out: blocks after encryption */
+	crypt_info_t* info)		/*< in/out: crypt info */
 {
-	return log_blocks_crypt(block, size, dst_block, true);
+	if (info->key_version == UNENCRYPTED_KEY_VER) {
+		memset(info->crypt_key, 0, sizeof(info->crypt_key));
+		memset(info->crypt_msg, 0, sizeof(info->crypt_msg));
+		memset(info->crypt_nonce, 0, sizeof(info->crypt_nonce));
+		return;
+	}
+
+	byte mysqld_key[AES_128_BLOCK_SIZE] = {0};
+	if (GetCryptoKey(info->key_version, mysqld_key, AES_128_BLOCK_SIZE)) {
+		fprintf(stderr,
+			"\nInnodb redo log crypto: getting mysqld crypto key "
+			"from key version failed.\n");
+		ut_error;
+	}
+
+	int dst_len;
+	int rc = EncryptAes128Ecb(
+		mysqld_key, //key
+		info->crypt_msg, sizeof(info->crypt_msg), //src, srclen
+		info->crypt_key, &dst_len); //dst, &dstlen
+	if (rc != CRYPT_OK || dst_len != AES_128_BLOCK_SIZE) {
+		fprintf(stderr,
+			"\nInnodb redo log crypto: getting redo log crypto key "
+			"failed.\n");
+		ut_error;
+	}
 }
 
-/*********************************************************************//**
-Decrypt log blocks. */
-UNIV_INTERN
-CryptResult
-log_blocks_decrypt(
-/*===============*/
-	const byte* block,		/*!< in: blocks before decryption */
-	const ulint size,		/*!< in: size of blocks, must be multiple of a log block */
-	byte* dst_block)		/*!< out: blocks after decryption */
+static bool mysort(const crypt_info_t& i,
+		   const crypt_info_t& j)
 {
-	return log_blocks_crypt(block, size, dst_block, false);
+	return i.checkpoint_no > j.checkpoint_no;
+}
+
+static
+bool add_crypt_info(crypt_info_t* info)
+{
+	/* so that no one is searching array while we modify it */
+	ut_ad(mutex_own(&(log_sys->mutex)));
+
+	if (get_crypt_info(info->checkpoint_no) != NULL) {
+		// already present...
+		return false;
+	}
+
+	init_crypt_key(info);
+	crypt_info.push_back(*info);
+
+	/* a log block only stores 4-bytes of checkpoint no */
+	crypt_info.back().checkpoint_no &= 0xFFFFFFFF;
+
+	// keep keys sorted, assuming that last added key will be used most
+	std::sort(crypt_info.begin(), crypt_info.end(), mysort);
+
+	return true;
 }
 
 /*********************************************************************//**
@@ -219,17 +243,104 @@ UNIV_INTERN
 void
 log_crypt_set_ver_and_key(
 /*======================*/
-	uint& key_ver,			/*!< out: latest key version */
-	byte* crypt_key)		/*!< out: crypto key */
+	ib_uint64_t next_checkpoint_no)
+{
+	crypt_info_t info;
+	info.checkpoint_no = next_checkpoint_no;
+
+	if (!srv_encrypt_log) {
+		info.key_version = UNENCRYPTED_KEY_VER;
+	} else {
+		info.key_version = GetLatestCryptoKeyVersion();
+	}
+
+	if (info.key_version == UNENCRYPTED_KEY_VER) {
+		memset(info.crypt_msg, 0, sizeof(info.crypt_msg));
+		memset(info.crypt_nonce, 0, sizeof(info.crypt_nonce));
+	} else {
+		if (RandomBytes(info.crypt_msg, AES_128_BLOCK_SIZE) !=
+		    CRYPT_OK) {
+			fprintf(stderr,
+				"\nInnodb redo log crypto: generate "
+				"%u-byte random number as crypto msg failed.\n",
+				AES_128_BLOCK_SIZE);
+			ut_error;
+		}
+
+		if (RandomBytes(info.crypt_nonce, AES_128_BLOCK_SIZE) !=
+		    CRYPT_OK) {
+			fprintf(stderr,
+				"\nInnodb redo log crypto: generate %u-byte "
+				"random number as AES_CTR nonce failed.\n",
+				AES_128_BLOCK_SIZE);
+			ut_error;
+		}
+
+	}
+
+	add_crypt_info(&info);
+}
+
+/********************************************************
+Encrypt one or more log block before it is flushed to disk */
+UNIV_INTERN
+void
+log_encrypt_before_write(
+/*===========================*/
+	ib_uint64_t next_checkpoint_no,	/*!< in: log group to be flushed */
+	byte* block,			/*!< in/out: pointer to a log block */
+	const ulint size)		/*!< in: size of log blocks */
 {
-	if (!srv_encrypt_log ||
-	    (key_ver = GetLatestCryptoKeyVersion()) == UNENCRYPTED_KEY_VER)
-	{
-		key_ver = UNENCRYPTED_KEY_VER;
-		memset(crypt_key, 0, AES_128_BLOCK_SIZE);
+	ut_ad(size % OS_FILE_LOG_BLOCK_SIZE == 0);
+
+	const crypt_info_t* info = get_crypt_info(next_checkpoint_no);
+	if (info == NULL) {
+		return;
+	}
+
+	if (info->key_version == UNENCRYPTED_KEY_VER) {
 		return;
 	}
-	log_init_crypt_key(redo_log_crypt_msg, key_ver, crypt_key);
+
+	byte* dst_frame = (byte*)malloc(size);
+
+	//encrypt log blocks content
+	CryptResult result = log_blocks_crypt(block, size, dst_frame, true);
+
+	if (result == CRYPT_OK) {
+		ut_ad(block[0] == dst_frame[0]);
+		memcpy(block, dst_frame, size);
+	}
+	free(dst_frame);
+
+	if (unlikely(result != CRYPT_OK)) {
+		ut_error;
+	}
+}
+
+/********************************************************
+Decrypt a specified log segment after they are read from a log file to a buffer.
+*/
+void
+log_decrypt_after_read(
+/*==========================*/
+	byte* frame,	        /*!< in/out: log segment */
+	const ulint size)	/*!< in: log segment size */
+{
+	ut_ad(size % OS_FILE_LOG_BLOCK_SIZE == 0);
+	byte* dst_frame = (byte*)malloc(size);
+
+	// decrypt log blocks content
+	CryptResult result = log_blocks_crypt(frame, size, dst_frame, false);
+
+	if (result == CRYPT_OK) {
+		memcpy(frame, dst_frame, size);
+	}
+	free(dst_frame);
+
+	if (unlikely(result != CRYPT_OK)) {
+		ut_error;
+	}
 }
 
 /*********************************************************************//**
@@ -242,15 +353,98 @@ log_crypt_write_checkpoint_buf(
 /*===========================*/
 	byte*	buf)			/*!< in/out: checkpoint buffer */
 {
-	ut_a(log_sys);
-	mach_write_to_4(buf + LOG_CRYPT_VER, log_sys->redo_log_crypt_ver);
-	if (!srv_encrypt_log ||
-	    log_sys->redo_log_crypt_ver == UNENCRYPTED_KEY_VER) {
-		memset(buf + LOG_CRYPT_MSG, 0, AES_128_BLOCK_SIZE);
-		memset(buf + LOG_CRYPT_IV, 0, AES_128_BLOCK_SIZE);
+	byte *save = buf;
+
+	// Only write kMaxSavedKeys (sort keys to remove oldest)
+	std::sort(crypt_info.begin(), crypt_info.end(), mysort);
+	while (crypt_info.size() > kMaxSavedKeys) {
+		crypt_info.pop_back();
+	}
+
+	bool encrypted = false;
+	for (size_t i = 0; i < crypt_info.size(); i++) {
+                const crypt_info_t & it = crypt_info[i];
+		if (it.key_version != UNENCRYPTED_KEY_VER) {
+			encrypted = true;
+			break;
+		}
+	}
+
+	if (encrypted == false) {
+		// if no encryption is inuse then zero out
+		// crypt data for upward/downward compability
+		memset(buf + LOG_CRYPT_VER, 0, LOG_CRYPT_SIZE);
 		return;
 	}
-	ut_a(redo_log_crypt_msg[PURPOSE_BYTE_OFFSET] == redo_log_purpose_byte);
-	memcpy(buf + LOG_CRYPT_MSG, redo_log_crypt_msg, AES_128_BLOCK_SIZE);
-	memcpy(buf + LOG_CRYPT_IV, aes_ctr_nonce, AES_128_BLOCK_SIZE);
+
+	ib_uint64_t checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
+	buf += LOG_CRYPT_VER;
+
+	mach_write_to_1(buf + 0, redo_log_purpose_byte);
+	mach_write_to_1(buf + 1, crypt_info.size());
+	buf += 2;
+	for (size_t i = 0; i < crypt_info.size(); i++) {
+		struct crypt_info_t* it = &crypt_info[i];
+		mach_write_to_4(buf + 0, it->checkpoint_no);
+		mach_write_to_4(buf + 4, it->key_version);
+		memcpy(buf + 8, it->crypt_msg, AES_128_BLOCK_SIZE);
+		memcpy(buf + 24, it->crypt_nonce, AES_128_BLOCK_SIZE);
+		buf += LOG_CRYPT_ENTRY_SIZE;
+	}
+
+#ifdef DEBUG_CRYPT
+	fprintf(stderr, "write chk: %lu [ chk key ]: ", checkpoint_no);
+	for (size_t i = 0; i < crypt_info.size(); i++) {
+		struct crypt_info_t* it = &crypt_info[i];
+		fprintf(stderr, "[ %lu %u ] ",
+			it->checkpoint_no,
+			it->key_version);
+	}
+	fprintf(stderr, "\n");
+#else
+	(void)checkpoint_no; // unused variable
+#endif
+        ut_a((buf - save) <= OS_FILE_LOG_BLOCK_SIZE);
+}
+
+/*********************************************************************//**
+Read the crypto (version, msg and iv) info, which has been used for
+log blocks with lsn <= this checkpoint's lsn, from a log header's
+checkpoint buf. */
+UNIV_INTERN
+void
+log_crypt_read_checkpoint_buf(
+/*===========================*/
+	const byte*	buf) {			/*!< in: checkpoint buffer */
+
+	buf += LOG_CRYPT_VER;
+
+	byte scheme = buf[0];
+	if (scheme != redo_log_purpose_byte) {
+		return;
+	}
+	buf++;
+	size_t n = buf[0];
+	buf++;
+
+	for (size_t i = 0; i < n; i++) {
+		struct crypt_info_t info;
+		info.checkpoint_no = mach_read_from_4(buf + 0);
+		info.key_version = mach_read_from_4(buf + 4);
+		memcpy(info.crypt_msg, buf + 8, AES_128_BLOCK_SIZE);
+		memcpy(info.crypt_nonce, buf + 24, AES_128_BLOCK_SIZE);
+		add_crypt_info(&info);
+		buf += LOG_CRYPT_ENTRY_SIZE;
+	}
+
+#ifdef DEBUG_CRYPT
+	fprintf(stderr, "read [ chk key ]: ");
+	for (size_t i = 0; i < crypt_info.size(); i++) {
+		struct crypt_info_t* it = &crypt_info[i];
+		fprintf(stderr, "[ %lu %u ] ",
+			it->checkpoint_no,
+			it->key_version);
+	}
+	fprintf(stderr, "\n");
+#endif
 }
diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc
index 9456e74..d9480fa 100644
--- a/storage/innobase/log/log0log.cc
+++ b/storage/innobase/log/log0log.cc
@@ -899,7 +899,6 @@ log_init(void)
 	/*----------------------------*/
 
 	log_sys->next_checkpoint_no = 0;
-	log_sys->redo_log_crypt_ver = UNENCRYPTED_KEY_VER;
 	log_sys->last_checkpoint_lsn = log_sys->lsn;
 	log_sys->n_pending_checkpoint_writes = 0;
 
@@ -1293,36 +1292,6 @@ log_block_store_checksum(
 }
 
 /******************************************************//**
-Encrypt one or more log block before it is flushed to disk
-@return true if encryption succeeds. */
-static
-bool
-log_group_encrypt_before_write(
-/*===========================*/
-	const log_group_t* group,	/*!< in: log group to be flushed */
-	byte* block,			/*!< in/out: pointer to a log block */
-	const ulint size)		/*!< in: size of log blocks */
-
-{
-	CryptResult result = CRYPT_OK;
-
-	ut_ad(size % OS_FILE_LOG_BLOCK_SIZE == 0);
-	byte* dst_frame = (byte*)malloc(size);
-
-	//encrypt log blocks content
-	result = log_blocks_encrypt(block, size, dst_frame);
-
-	if (result == CRYPT_OK)
-	{
-		ut_ad(block[0] == dst_frame[0]);
-		memcpy(block, dst_frame, size);
-	}
-	free(dst_frame);
-
-	return (result == CRYPT_OK);
-}
-
-/******************************************************//**
 Writes a buffer to a log file group. */
 UNIV_INTERN
 void
@@ -1428,14 +1397,8 @@ loop:
 
 		ut_a(next_offset / UNIV_PAGE_SIZE <= ULINT_MAX);
 
-		if (srv_encrypt_log &&
-		    log_sys->redo_log_crypt_ver != UNENCRYPTED_KEY_VER &&
-		    !log_group_encrypt_before_write(group, buf, write_len))
-		{
-			fprintf(stderr,
-				"\nInnodb redo log encryption failed.\n");
-			abort();
-		}
+		log_encrypt_before_write(log_sys->next_checkpoint_no,
+					 buf, write_len);
 
 		fil_io(OS_FILE_WRITE | OS_FILE_LOG, true, group->space_id, 0,
 		       (ulint) (next_offset / UNIV_PAGE_SIZE),
@@ -2198,12 +2161,15 @@ log_checkpoint(
 	}
 #endif /* UNIV_DEBUG */
 
+	/* generate key version and key used to encrypt future blocks,
+	*
+	* NOTE: the +1 is as the next_checkpoint_no will be updated once
+	* the checkpoint info has been written and THEN blocks will be encrypted
+	* with new key
+	*/
+	log_crypt_set_ver_and_key(log_sys->next_checkpoint_no + 1);
 	log_groups_write_checkpoint_info();
 
-	/* generate key version and key used to encrypt next log block */
-	log_crypt_set_ver_and_key(log_sys->redo_log_crypt_ver,
-				  log_sys->redo_log_crypt_key);
-
 	MONITOR_INC(MONITOR_NUM_CHECKPOINT);
 
 	mutex_exit(&(log_sys->mutex));
@@ -2337,33 +2303,6 @@ loop:
 }
 
 /******************************************************//**
-Decrypt a specified log segment after they are read from a log file to a buffer.
-@return true if decryption succeeds. */
-static
-bool
-log_group_decrypt_after_read(
-/*==========================*/
-	const log_group_t* group,	/*!< in: log group to be read from */
-	byte* frame,	/*!< in/out: log segment */
-	const ulint size)	/*!< in: log segment size */
-{
-	CryptResult result;
-	ut_ad(size % OS_FILE_LOG_BLOCK_SIZE == 0);
-	byte* dst_frame = (byte*)malloc(size);
-
-	// decrypt log blocks content
-	result = log_blocks_decrypt(frame, size, dst_frame);
-
-	if (result == CRYPT_OK)
-	{
-		memcpy(frame, dst_frame, size);
-	}
-	free(dst_frame);
-
-	return (result == CRYPT_OK);
-}
-
-/******************************************************//**
 Reads a specified log segment to a buffer. */
 UNIV_INTERN
 void
@@ -2416,12 +2355,7 @@ loop:
 	       (ulint) (source_offset % UNIV_PAGE_SIZE),
 	       len, buf, NULL);
 
-	if (recv_sys->recv_log_crypt_ver != UNENCRYPTED_KEY_VER &&
-	    !log_group_decrypt_after_read(group, buf, len))
-	{
-		fprintf(stderr, "Innodb redo log decryption failed.\n");
-		abort();
-	}
+	log_decrypt_after_read(buf, len);
 
 	start_lsn += len;
 	buf += len;
@@ -2646,13 +2580,8 @@ loop:
 
 	MONITOR_INC(MONITOR_LOG_IO);
 
-	if (srv_encrypt_log &&
-	    log_sys->redo_log_crypt_ver != UNENCRYPTED_KEY_VER &&
-	    !log_group_encrypt_before_write(group, buf, len))
-	{
-		fprintf(stderr, "Innodb redo log encryption failed.\n");
-		abort();
-	}
+	//TODO (jonaso): This must be dead code??
+	log_group_encrypt_before_write(log_sys->next_checkpoint_no, buf, len);
 
 	fil_io(OS_FILE_WRITE | OS_FILE_LOG, false, group->archive_space_id,
 	       (ulint) (next_offset / UNIV_PAGE_SIZE),
diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc
index 99a5a98..eb72e5a 100644
--- a/storage/innobase/log/log0recv.cc
+++ b/storage/innobase/log/log0recv.cc
@@ -802,6 +802,7 @@ recv_find_max_checkpoint(
 				buf + LOG_CHECKPOINT_OFFSET_HIGH32)) << 32;
 			checkpoint_no = mach_read_from_8(
 				buf + LOG_CHECKPOINT_NO);
+			log_crypt_read_checkpoint_buf(buf);
 
 #ifdef UNIV_DEBUG
 			if (log_debug_writes) {
@@ -932,6 +933,12 @@ log_block_checksum_is_ok_or_old_format(
 		return(TRUE);
 	}
 
+	fprintf(stderr, "BROKEN: block: %lu checkpoint: %lu %.8lx %.8lx\n",
+		log_block_get_hdr_no(block),
+		log_block_get_checkpoint_no(block),
+		log_block_calc_checksum(block),
+		log_block_get_checksum(block));
+
 	return(FALSE);
 }
 
@@ -2667,6 +2674,13 @@ recv_scan_log_recs(
 
 			finished = TRUE;
 
+			/* Crash if we encounter a garbage log block */
+			if (!srv_force_recovery) {
+				fputs("InnoDB: Set innodb_force_recovery"
+				      " to ignore this error.\n", stderr);
+				ut_error;
+			}
+
 			break;
 		}
 
@@ -2949,7 +2963,6 @@ recv_recovery_from_checkpoint_start_func(
 	ulint		max_cp_field;
 	lsn_t		checkpoint_lsn;
 	ib_uint64_t	checkpoint_no;
-	uint		recv_crypt_ver;
 	lsn_t		group_scanned_lsn = 0;
 	lsn_t		contiguous_lsn;
 #ifdef UNIV_LOG_ARCHIVE
@@ -3009,14 +3022,6 @@ recv_recovery_from_checkpoint_start_func(
 #ifdef UNIV_LOG_ARCHIVE
 	archived_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_ARCHIVED_LSN);
 #endif /* UNIV_LOG_ARCHIVE */
-	recv_crypt_ver = mach_read_from_4(buf + LOG_CRYPT_VER);
-	if (recv_crypt_ver == UNENCRYPTED_KEY_VER)
-	{
-		log_init_crypt_msg_and_nonce();
-	} else {
-		ut_memcpy(redo_log_crypt_msg, buf + LOG_CRYPT_MSG, AES_128_BLOCK_SIZE);
-		ut_memcpy(aes_ctr_nonce, buf + LOG_CRYPT_IV, AES_128_BLOCK_SIZE);
-	}
 
 	/* Read the first log file header to print a note if this is
 	a recovery from a restored InnoDB Hot Backup */
@@ -3073,15 +3078,10 @@ recv_recovery_from_checkpoint_start_func(
 		/* Start reading the log groups from the checkpoint lsn up. The
 		variable contiguous_lsn contains an lsn up to which the log is
 		known to be contiguously written to all log groups. */
-
 		recv_sys->parse_start_lsn = checkpoint_lsn;
 		recv_sys->scanned_lsn = checkpoint_lsn;
 		recv_sys->scanned_checkpoint_no = 0;
 		recv_sys->recovered_lsn = checkpoint_lsn;
-		recv_sys->recv_log_crypt_ver = recv_crypt_ver;
-		log_init_crypt_key(redo_log_crypt_msg,
-				   recv_sys->recv_log_crypt_ver,
-				   recv_sys->recv_log_crypt_key);
 		srv_start_lsn = checkpoint_lsn;
 	}
 
@@ -3251,8 +3251,9 @@ recv_recovery_from_checkpoint_start_func(
 
 	log_sys->next_checkpoint_lsn = checkpoint_lsn;
 	log_sys->next_checkpoint_no = checkpoint_no + 1;
-	log_crypt_set_ver_and_key(log_sys->redo_log_crypt_ver,
-				  log_sys->redo_log_crypt_key);
+	/* here the checkpoint info is written without any redo logging ongoing
+	* and next_checkpoint_no is updated directly hence no +1 */
+	log_crypt_set_ver_and_key(log_sys->next_checkpoint_no);
 
 #ifdef UNIV_LOG_ARCHIVE
 	log_sys->archived_lsn = archived_lsn;
@@ -3283,8 +3284,7 @@ recv_recovery_from_checkpoint_start_func(
 		    log_sys->lsn - log_sys->last_checkpoint_lsn);
 
 	log_sys->next_checkpoint_no = checkpoint_no + 1;
-	log_crypt_set_ver_and_key(log_sys->redo_log_crypt_ver,
-				  log_sys->redo_log_crypt_key);
+	log_crypt_set_ver_and_key(log_sys->next_checkpoint_no);
 
 #ifdef UNIV_LOG_ARCHIVE
 	if (archived_lsn == LSN_MAX) {
@@ -3486,16 +3486,6 @@ recv_reset_logs(
 
 	log_sys->next_checkpoint_no = 0;
 	log_sys->last_checkpoint_lsn = 0;
-	/* redo_log_crypt_ver will be set by log_checkpoint() to the
-	latest key version. */
-	log_sys->redo_log_crypt_ver = UNENCRYPTED_KEY_VER;
-	/*
-	  Note: flags (srv_encrypt_log and opt_danger_danger_use_dbug_keys)
-	  haven't been read and set yet!
-	  So don't use condition such as:
-	  if (srv_encrypt_log && opt_danger_danger_use_dbug_keys)
-	*/
-	log_init_crypt_msg_and_nonce();
 
 #ifdef UNIV_LOG_ARCHIVE
 	log_sys->archived_lsn = log_sys->lsn;
@@ -3972,4 +3962,3 @@ byte* recv_dblwr_t::find_page(ulint space_id, ulint page_no)
 
 	return(result);
 }
-
diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc
index cd47c67..30df472 100644
--- a/storage/innobase/srv/srv0start.cc
+++ b/storage/innobase/srv/srv0start.cc
@@ -2839,6 +2839,15 @@ files_checked:
 			(ulong) srv_force_recovery);
 	}
 
+	if (!srv_read_only_mode) {
+		/*
+		  Create a checkpoint before logging anything new, so that
+		  the current encryption key in use is definitely logged
+		  before any log blocks encrypted with that key.
+		*/
+		log_make_checkpoint_at(LSN_MAX, TRUE);
+	}
+
 	if (srv_force_recovery == 0) {
 		/* In the insert buffer we may have even bigger tablespace
 		id's, because we may have dropped those tablespaces, but
