diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result b/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result
new file mode 100644
index 0000000..f3545a4
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result
@@ -0,0 +1,150 @@
+include/master-slave.inc
+[connection master]
+set global rpl_semi_sync_master_enabled = 1;
+set global rpl_semi_sync_master_max_slave_lag = 10;
+show variables like 'rpl_semi_sync_master_%slave_lag%';
+Variable_name	Value
+rpl_semi_sync_master_max_slave_lag	10
+rpl_semi_sync_master_slave_lag_heartbeat_frequency_us	500000
+rpl_semi_sync_master_slave_lag_wait_timeout	50
+include/stop_slave.inc
+set global rpl_semi_sync_slave_enabled = 1;
+set global rpl_semi_sync_slave_lag_enabled = 1;
+include/start_slave.inc
+# create non-root user for testing READ_ONLY
+grant SELECT, INSERT on *.* to test@localhost;
+CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8))
+ENGINE=innodb;
+#
+# Check basic behaviour
+#
+INSERT INTO t1 (f) VALUES ('1'),('2'),('3');
+# Now wait for slave lag to decrease to 0
+# [ on slave ]
+STOP SLAVE SQL_THREAD;
+# [ on master ]
+INSERT INTO t1 (f) VALUES ('4'),('5'),('6');
+# Now wait for slave lag to increase to > 0
+# [ on slave ]
+START SLAVE SQL_THREAD;
+# [ on master ]
+# Now wait for slave lag to decrease to 0
+# [ on slave ]
+STOP SLAVE SQL_THREAD;
+# [ on master ]
+set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
+# First transaction should succeed. slave_lag is zero when it commits
+INSERT INTO t1 (f) VALUES ('7'),('8'),('9');
+# Now wait for slave lag to increase to > 10s
+# Check that estimated_slave_lag is > 10s
+SELECT VARIABLE_VALUE > 10000000 as should_be_1
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
+should_be_1
+1
+# Second transaction should now fail. slave_lag is >10s when it commits
+INSERT INTO t1 (f) VALUES ('a'),('b'),('c');
+ERROR HY000: Slave-lag timeout
+# [ on slave ]
+START SLAVE SQL_THREAD;
+# [ on master ]
+# Now wait for slave lag to decrease to < 10s
+# And now it should succeed again
+INSERT INTO t1 (f) VALUES ('d'),('e'),('f');
+SELECT *
+FROM t1
+ORDER BY 1;
+i	f
+1	1
+2	2
+3	3
+4	4
+5	5
+6	6
+7	7
+8	8
+9	9
+13	d
+14	e
+15	f
+# [ on slave ]
+SELECT *
+FROM t1
+ORDER BY 1;
+i	f
+1	1
+2	2
+3	3
+4	4
+5	5
+6	6
+7	7
+8	8
+9	9
+13	d
+14	e
+15	f
+#
+# Test interaction with READ_ONLY
+#
+# [ on slave ]
+STOP SLAVE SQL_THREAD;
+# [ on master ]
+INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
+# Now wait for slave lag to increase to > 10s
+# [ on con1 ]
+BEGIN;
+INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
+# [ on master ]
+set global read_only = 1;
+# [ on con1 ]
+set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
+# read-only is check *before* slave lag
+COMMIT;
+ERROR HY000: The MariaDB server is running with the --read-only option so it cannot execute this statement
+# [ on slave ]
+START SLAVE SQL_THREAD;
+# [ on master ]
+# Now wait for slave lag to decrease to 0
+set global read_only = 0;
+#
+# check slave_lag > 0 but less than rpl_semi_sync_master_max_slave_lag
+#
+# [ on slave ]
+STOP SLAVE SQL_THREAD;
+# [ on master ]
+INSERT INTO t1 (f) VALUES ('j'),('k'),('l');
+# Now wait for slave lag to increase to > 0
+# Capture rpl_semi_sync_master_tx_slave_lag_waits before transaction
+select @count_before := VARIABLE_VALUE
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
+# Set maximum allowed slave lag to 24h
+set global rpl_semi_sync_master_max_slave_lag = 86400;
+INSERT INTO t1 (f) VALUES ('m'),('n'),('o');
+# Capture rpl_semi_sync_master_tx_slave_lag_waits after transaction
+select @count_after := VARIABLE_VALUE
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
+# There should have been no wait, since maximum allowed is very high
+select @count_before = @count_after as should_be_1;
+should_be_1
+1
+# [ on slave ]
+START SLAVE SQL_THREAD;
+# [ on master ]
+# Now wait for slave lag to decrease to 0
+#
+# Clean up
+#
+# [ on master ]
+set global rpl_semi_sync_master_max_slave_lag = default;
+set session rpl_semi_sync_master_slave_lag_wait_timeout = default;
+DROP USER test@localhost;
+include/stop_slave.inc
+set global rpl_semi_sync_slave_enabled = 0;
+set global rpl_semi_sync_slave_lag_enabled = default;
+set global rpl_semi_sync_master_enabled = 0;
+include/start_slave.inc
+DROP TABLE t1;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test b/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test
new file mode 100644
index 0000000..f9bdbd9
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test
@@ -0,0 +1,252 @@
+source include/have_semisync.inc;
+source include/not_embedded.inc;
+source include/have_innodb.inc;
+source include/master-slave.inc;
+
+connection master;
+set global rpl_semi_sync_master_enabled = 1;
+set global rpl_semi_sync_master_max_slave_lag = 10;
+
+show variables like 'rpl_semi_sync_master_%slave_lag%';
+
+connection slave;
+source include/stop_slave.inc;
+set global rpl_semi_sync_slave_enabled = 1;
+set global rpl_semi_sync_slave_lag_enabled = 1;
+
+source include/start_slave.inc;
+
+connection master;
+
+--echo # create non-root user for testing READ_ONLY
+grant SELECT, INSERT on *.* to test@localhost;
+connect (con1,localhost,test,,test);
+
+CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8))
+ENGINE=innodb;
+
+--echo #
+--echo # Check basic behaviour
+--echo #
+INSERT INTO t1 (f) VALUES ('1'),('2'),('3');
+
+--echo # Now wait for slave lag to decrease to 0
+let $wait_condition= SELECT VARIABLE_VALUE = 0
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
+--source include/wait_condition.inc
+
+--echo # [ on slave ]
+connection slave;
+STOP SLAVE SQL_THREAD;
+
+--echo # [ on master ]
+connection master;
+INSERT INTO t1 (f) VALUES ('4'),('5'),('6');
+
+--echo # Now wait for slave lag to increase to > 0
+let $wait_condition= SELECT VARIABLE_VALUE > 0
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
+--source include/wait_condition.inc
+
+--echo # [ on slave ]
+connection slave;
+START SLAVE SQL_THREAD;
+
+--echo # [ on master ]
+connection master;
+
+--echo # Now wait for slave lag to decrease to 0
+let $wait_condition= SELECT VARIABLE_VALUE = 0
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
+--source include/wait_condition.inc
+
+--echo # [ on slave ]
+connection slave;
+STOP SLAVE SQL_THREAD;
+
+--echo # [ on master ]
+connection master;
+
+set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
+
+--echo # First transaction should succeed. slave_lag is zero when it commits
+INSERT INTO t1 (f) VALUES ('7'),('8'),('9');
+
+--echo # Now wait for slave lag to increase to > 10s
+let $wait_condition= SELECT VARIABLE_VALUE > 10000000
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
+--source include/wait_condition.inc
+
+--echo # Check that estimated_slave_lag is > 10s
+SELECT VARIABLE_VALUE > 10000000 as should_be_1
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
+
+--echo # Second transaction should now fail. slave_lag is >10s when it commits
+--error ER_ERROR_DURING_COMMIT
+INSERT INTO t1 (f) VALUES ('a'),('b'),('c');
+
+--echo # [ on slave ]
+connection slave;
+START SLAVE SQL_THREAD;
+
+--echo # [ on master ]
+connection master;
+
+--echo # Now wait for slave lag to decrease to < 10s
+let $wait_condition= SELECT VARIABLE_VALUE < 10000000
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
+--source include/wait_condition.inc
+
+--echo # And now it should succeed again
+INSERT INTO t1 (f) VALUES ('d'),('e'),('f');
+
+SELECT *
+FROM t1
+ORDER BY 1;
+
+sync_slave_with_master;
+
+--echo # [ on slave ]
+connection slave;
+
+SELECT *
+FROM t1
+ORDER BY 1;
+
+--echo #
+--echo # Test interaction with READ_ONLY
+--echo #
+
+--echo # [ on slave ]
+connection slave;
+STOP SLAVE SQL_THREAD;
+
+--echo # [ on master ]
+connection master;
+
+INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
+
+--echo # Now wait for slave lag to increase to > 10s
+let $wait_condition= SELECT VARIABLE_VALUE > 10000000
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
+--source include/wait_condition.inc
+
+connection con1;
+--echo # [ on con1 ]
+BEGIN;
+INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
+
+connection master;
+--echo # [ on master ]
+set global read_only = 1;
+
+connection con1;
+--echo # [ on con1 ]
+set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
+
+--echo # read-only is check *before* slave lag
+--error ER_OPTION_PREVENTS_STATEMENT
+COMMIT;
+
+disconnect con1;
+
+--echo # [ on slave ]
+connection slave;
+START SLAVE SQL_THREAD;
+
+connection master;
+--echo # [ on master ]
+
+--echo # Now wait for slave lag to decrease to 0
+let $wait_condition= SELECT VARIABLE_VALUE = 0
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
+--source include/wait_condition.inc
+
+set global read_only = 0;
+
+--echo #
+--echo # check slave_lag > 0 but less than rpl_semi_sync_master_max_slave_lag
+--echo #
+
+--echo # [ on slave ]
+connection slave;
+STOP SLAVE SQL_THREAD;
+
+connection master;
+--echo # [ on master ]
+INSERT INTO t1 (f) VALUES ('j'),('k'),('l');
+
+--echo # Now wait for slave lag to increase to > 0
+let $wait_condition= SELECT VARIABLE_VALUE > 0
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
+--source include/wait_condition.inc
+
+--echo # Capture rpl_semi_sync_master_tx_slave_lag_waits before transaction
+--disable_result_log
+select @count_before := VARIABLE_VALUE
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
+--enable_result_log
+
+--echo # Set maximum allowed slave lag to 24h
+set global rpl_semi_sync_master_max_slave_lag = 86400; # 24h
+
+INSERT INTO t1 (f) VALUES ('m'),('n'),('o');
+
+--echo # Capture rpl_semi_sync_master_tx_slave_lag_waits after transaction
+--disable_result_log
+select @count_after := VARIABLE_VALUE
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
+--enable_result_log
+
+--echo # There should have been no wait, since maximum allowed is very high
+select @count_before = @count_after as should_be_1;
+
+--echo # [ on slave ]
+connection slave;
+START SLAVE SQL_THREAD;
+
+connection master;
+--echo # [ on master ]
+
+--echo # Now wait for slave lag to decrease to 0
+let $wait_condition= SELECT VARIABLE_VALUE = 0
+FROM INFORMATION_SCHEMA.GLOBAL_STATUS
+WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
+--source include/wait_condition.inc
+
+--echo #
+--echo # Clean up
+--echo #
+connection master;
+--echo # [ on master ]
+set global rpl_semi_sync_master_max_slave_lag = default;
+set session rpl_semi_sync_master_slave_lag_wait_timeout = default;
+DROP USER test@localhost;
+
+connection slave;
+source include/stop_slave.inc;
+set global rpl_semi_sync_slave_enabled = 0;
+set global rpl_semi_sync_slave_lag_enabled = default;
+
+connection master;
+set global rpl_semi_sync_master_enabled = 0;
+
+connection slave;
+source include/start_slave.inc;
+
+connection master;
+
+DROP TABLE t1;
+sync_slave_with_master;
+--source include/rpl_end.inc
diff --git a/plugin/semisync/semisync.cc b/plugin/semisync/semisync.cc
index 4a80360..fc02b9d 100644
--- a/plugin/semisync/semisync.cc
+++ b/plugin/semisync/semisync.cc
@@ -20,6 +20,7 @@
 
 const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef;
 const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
+const unsigned char ReplSemiSyncBase::kPacketFlagSyncAndReport = 0x02;
 
 
 const unsigned long Trace::kTraceGeneral  = 0x0001;
@@ -29,3 +30,6 @@ const unsigned long Trace::kTraceFunction = 0x0040;
 
 const unsigned char  ReplSemiSyncBase::kSyncHeader[2] =
   {ReplSemiSyncBase::kPacketMagicNum, 0};
+
+const char* const ReplSemiSyncBase::kRplSemiSyncSlaveReportExec =
+    "rpl_semi_sync_slave_report_exec";
diff --git a/plugin/semisync/semisync.h b/plugin/semisync/semisync.h
index 2857729..78faba9 100644
--- a/plugin/semisync/semisync.h
+++ b/plugin/semisync/semisync.h
@@ -75,13 +75,23 @@ class ReplSemiSyncBase
 
   /* Constants in network packet header. */
   static const unsigned char kPacketMagicNum;
+  /* this event should be semisync acked */
   static const unsigned char kPacketFlagSync;
+  /* this event should be semisync acked including the current SQL position */
+  static const unsigned char kPacketFlagSyncAndReport;
+
+  /* user variable for enabling exec-pos reporting */
+  static const char* const kRplSemiSyncSlaveReportExec;
 };
 
 /* The layout of a semisync slave reply packet:
    1 byte for the magic num
    8 bytes for the binlog positon
-   n bytes for the binlog filename, terminated with a '\0'
+   n bytes for the binlog filename, NOT terminated with a '\0'
+   [ optionally ]
+   1 byte == 0
+   8 bytes for the sql-thread position
+   n bytes for the sql-thread filename, terminated with a '\0'
 */
 #define REPLY_MAGIC_NUM_LEN 1
 #define REPLY_BINLOG_POS_LEN 8
diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc
index c88c162..3ee6e26 100644
--- a/plugin/semisync/semisync_master.cc
+++ b/plugin/semisync/semisync_master.cc
@@ -22,6 +22,9 @@
 #define TIME_MILLION  1000000
 #define TIME_BILLION  1000000000
 
+/* thd_key for per slave thread state */
+static MYSQL_THD_KEY_T thd_key;
+
 /* This indicates whether semi-synchronous replication is enabled. */
 char rpl_semi_sync_master_enabled;
 unsigned long rpl_semi_sync_master_wait_point       =
@@ -45,6 +48,18 @@ unsigned long long rpl_semi_sync_master_net_wait_time = 0;
 unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
 char rpl_semi_sync_master_wait_no_slave = 1;
 
+unsigned long rpl_semi_sync_master_max_unacked_event_count = 0;
+unsigned long rpl_semi_sync_master_max_unacked_event_bytes = 4096;
+
+unsigned long rpl_semi_sync_master_slave_lag_clients = 0;
+unsigned long long rpl_semi_sync_master_estimated_slave_lag = 0;
+unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us = 500000;
+unsigned long rpl_semi_sync_master_max_slave_lag = 0;
+unsigned long rpl_semi_sync_master_slave_lag_wait_sessions = 0;
+
+unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0;
+unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num = 0;
+unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time = 0;
 
 static int getWaitTime(const struct timespec& start_ts);
 
@@ -150,6 +165,15 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
   ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
   ins_node->log_pos_ = log_file_pos;
 
+  {
+    /**
+     * set trans commit time
+     *   this is called when writing into binlog, which is not
+     *   exactly right, but close enough for our purposes
+     */
+    ins_node->tranx_commit_time_us = my_hrtime().val;
+  }
+
   if (!trx_front_)
   {
     /* The list is empty. */
@@ -193,12 +217,11 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
   return function_exit(kWho, result);
 }
 
-bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
-				   my_off_t    log_file_pos)
+TranxNode* ActiveTranx::lookup_tranx_end_pos(const char *log_file_name,
+                                             my_off_t log_file_pos)
 {
-  const char *kWho = "ActiveTranx::is_tranx_end_pos";
+  const char *kWho = "ActiveTranx::lookup_tranx_end_pos";
   function_enter(kWho);
-
   unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
   TranxNode *entry = trx_htb_[hash_val];
 
@@ -211,38 +234,24 @@ bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
   }
 
   if (trace_level_ & kTraceDetail)
-    sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
-                          log_file_name, (unsigned long)log_file_pos, hash_val);
+    sql_print_information("%s: probe (%s, %lu)", kWho,
+                          log_file_name, (unsigned long)log_file_pos);
 
   function_exit(kWho, (entry != NULL));
-  return (entry != NULL);
+  return entry;
 }
 
-int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
-					  my_off_t log_file_pos)
+int ActiveTranx::clear_active_tranx_nodes()
 {
-  const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
-  TranxNode *new_front;
+  set_new_front(NULL);
+  return 0;
+}
 
+void ActiveTranx::set_new_front(TranxNode *new_front)
+{
+  const char *kWho = "ActiveTranx::set_new_front";
   function_enter(kWho);
 
-  if (log_file_name != NULL)
-  {
-    new_front = trx_front_;
-
-    while (new_front)
-    {
-      if (compare(new_front, log_file_name, log_file_pos) > 0)
-        break;
-      new_front = new_front->next_;
-    }
-  }
-  else
-  {
-    /* If log_file_name is NULL, clear everything. */
-    new_front = NULL;
-  }
-
   if (new_front == NULL)
   {
     /* No active transaction nodes after the call. */
@@ -257,7 +266,6 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
       trx_front_ = NULL;
       trx_rear_  = NULL;
     }
-
     if (trace_level_ & kTraceDetail)
       sql_print_information("%s: cleared all nodes", kWho);
   }
@@ -291,14 +299,40 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
 
     trx_front_ = new_front;
     allocator_.free_nodes_before(trx_front_);
-
     if (trace_level_ & kTraceDetail)
       sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
                             kWho, n_frees,
                             trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
   }
+  function_exit(kWho, 0);
+}
 
-  return function_exit(kWho, 0);
+bool ActiveTranx::prune_active_tranx_nodes(
+    LogPosPtr pos,
+    ulonglong *oldest_tranx_commit_time_us)
+{
+  TranxNode *old_front = trx_front_;
+  TranxNode *new_front;
+
+  new_front = trx_front_;
+  while (new_front)
+  {
+    if (compare(new_front, pos.file_name, pos.file_pos) > 0)
+      break;
+    new_front = new_front->next_;
+  }
+
+  set_new_front(new_front);
+
+  if (oldest_tranx_commit_time_us)
+  {
+    if (trx_front_ == NULL)
+      *oldest_tranx_commit_time_us = 0;
+    else
+      *oldest_tranx_commit_time_us = trx_front_->tranx_commit_time_us;
+  }
+
+  return ! (old_front == trx_front_);
 }
 
 
@@ -334,7 +368,8 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
     wait_file_pos_(0),
     master_enabled_(false),
     wait_timeout_(0L),
-    state_(0)
+    state_(0),
+    oldest_unapplied_tranx_commit_time_us_(0)
 {
   strcpy(reply_file_name_, "");
   strcpy(wait_file_name_, "");
@@ -362,11 +397,19 @@ int ReplSemiSyncMaster::initObject()
   mysql_cond_init(key_ss_cond_COND_binlog_send_,
                   &COND_binlog_send_, NULL);
 
+  /* Mutex initialization can only be done after MY_INIT(). */
+  mysql_mutex_init(key_ss_mutex_LOCK_slave_lag_,
+                   &LOCK_slave_lag_, MY_MUTEX_INIT_FAST);
+  mysql_cond_init(key_ss_cond_COND_slave_lag_,
+                  &COND_slave_lag_, NULL);
+
   if (rpl_semi_sync_master_enabled)
     result = enableMaster();
   else
     result = disableMaster();
 
+  thd_key_create(&thd_key);
+
   return result;
 }
 
@@ -437,6 +480,8 @@ void ReplSemiSyncMaster::cleanup()
   {
     mysql_mutex_destroy(&LOCK_binlog_);
     mysql_cond_destroy(&COND_binlog_send_);
+    mysql_mutex_destroy(&LOCK_slave_lag_);
+    mysql_cond_destroy(&COND_slave_lag_);
     init_done_= 0;
   }
 
@@ -473,7 +518,34 @@ void ReplSemiSyncMaster::add_slave()
 {
   lock();
   rpl_semi_sync_master_clients++;
+  if (has_semi_sync_slave_lag())
+    rpl_semi_sync_master_slave_lag_clients++;
   unlock();
+
+  if (has_semi_sync_slave_lag())
+  {
+    int null_val = 0;
+    longlong new_val =
+        rpl_semi_sync_master_slave_lag_heartbeat_frequency_us * 1000;
+    longlong old_val = new_val + 1;
+
+    get_user_var_int("master_heartbeat_period", &old_val, &null_val);
+    if (old_val > new_val || null_val)
+    {
+      /* if there no old value or it's bigger than what we want */
+      int res = set_user_var_int("master_heartbeat_period",new_val, &old_val);
+      if (res == -1)
+      {
+        sql_print_error(
+            "Repl_semi_sync::failed to set master_heartbeat_period");
+      }
+    }
+  }
+
+  /**
+   * create per slave-state and store it in thread-local-storage */
+  ReplSemiSyncMasterPerSlaveState *state = new ReplSemiSyncMasterPerSlaveState;
+  thd_setspecific(current_thd, thd_key, state);
 }
 
 void ReplSemiSyncMaster::remove_slave()
@@ -492,7 +564,31 @@ void ReplSemiSyncMaster::remove_slave()
         rpl_semi_sync_master_clients == 0)
       switch_off();
   }
+
+  bool no_slave_lag_clients = false;
+  if (has_semi_sync_slave_lag())
+  {
+    if (--rpl_semi_sync_master_slave_lag_clients == 0)
+    {
+      no_slave_lag_clients = true;
+    }
+  }
+
   unlock();
+
+  ReplSemiSyncMasterPerSlaveState *state =
+      (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd, thd_key);
+  thd_setspecific(current_thd, thd_key, NULL);
+
+  if (state != NULL)
+  {
+    delete state;
+  }
+
+  if (no_slave_lag_clients)
+  {
+    wake_slave_lag_waiters(0);
+  }
 }
 
 bool ReplSemiSyncMaster::is_semi_sync_slave()
@@ -503,14 +599,115 @@ bool ReplSemiSyncMaster::is_semi_sync_slave()
   return val;
 }
 
+bool ReplSemiSyncMaster::has_semi_sync_slave_lag()
+{
+  int null_value;
+  long long val= 0;
+  get_user_var_int(kRplSemiSyncSlaveReportExec, &val, &null_value);
+  return val;
+}
+
+int ReplSemiSyncMaster::checkSyncReq(const LogPosPtr *log_pos)
+{
+  if (log_pos == NULL)
+  {
+    /* heartbeat events does not have logpos (since they are not actually
+     * stored in the binlog).
+     */
+    if (!has_semi_sync_slave_lag())
+    {
+      /* don't semi-sync them if we haven't enabled slave-lag handling */
+      return 0;
+    }
+    else
+    {
+      /* else ask for both IO and exec position */
+      return 2;
+    }
+  }
+
+  /**
+   * check if this log-pos is a candidate for semi-syncing event
+   */
+  TranxNode *entry = active_tranxs_->lookup_tranx_end_pos(log_pos->file_name,
+                                                          log_pos->file_pos);
+
+  if (entry == NULL)
+    return 0;
+
+  ReplSemiSyncMasterPerSlaveState *state =
+      (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd,
+                                                        thd_key);
+  do
+  {
+    state->unacked_event_count_++;
+
+    if (active_tranxs_->is_rear(entry))
+    {
+      /* always ask for ack on last event in tranx list */
+      break;
+    }
+
+    if (state->unacked_event_count_ >=
+        rpl_semi_sync_master_max_unacked_event_count)
+    {
+      /* enough events passed that it's time for another ack */
+      break;
+    }
+
+    if (!state->sync_req_pos_.IsInited())
+    {
+      /* first event => time for ack */
+      break;
+    }
+
+    if (strcmp(log_pos->file_name, state->sync_req_pos_.file_name) != 0)
+    {
+      /* new file => time for ack */
+      break;
+    }
+
+    if (log_pos->file_pos >= (state->sync_req_pos_.file_pos +
+                              rpl_semi_sync_master_max_unacked_event_bytes))
+    {
+      /* enough bytes => time for ack */
+      break;
+    }
+
+    /* we skip asking for semi-sync ack on this event */
+    return 0;
+
+  } while (0);
+
+  /* keep track on when we last asked for semi-sync-ack */
+  state->unacked_event_count_ = 0;
+  state->sync_req_pos_.Assign(log_pos);
+
+  /**
+   * check if this slave can report back exec position
+   */
+  if (!has_semi_sync_slave_lag())
+  {
+    /* slave can't report back SQL position */
+    return 1;
+  }
+
+  /* ask for both IO and SQL position */
+  return 2;
+}
+
 int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
 					  const char *log_file_name,
-					  my_off_t log_file_pos)
+					  my_off_t log_file_pos,
+                                          const LogPos *exec_pos)
 {
   const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
   int   cmp;
   bool  can_release_threads = false;
   bool  need_copy_send_pos = true;
+  bool  pruned_trx_list = false;
+  ulonglong oldest_tranx_commit_time_us = 0;
+
 
   if (!(getMasterEnabled()))
     return 0;
@@ -559,15 +756,29 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
     reply_file_pos_ = log_file_pos;
     reply_file_name_inited_ = true;
 
-    /* Remove all active transaction nodes before this point. */
-    assert(active_tranxs_ != NULL);
-    active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
-
     if (trace_level_ & kTraceDetail)
       sql_print_information("%s: Got reply at (%s, %lu)", kWho,
                             log_file_name, (unsigned long)log_file_pos);
   }
 
+  assert(active_tranxs_ != NULL);
+  if (exec_pos != NULL)
+  {
+    /* prune using exec_pos */
+    LogPosPtr ptr(*exec_pos);
+    pruned_trx_list = active_tranxs_->prune_active_tranx_nodes(
+        ptr, &oldest_tranx_commit_time_us);
+  }
+  else if (rpl_semi_sync_master_slave_lag_clients == 0 && need_copy_send_pos)
+  {
+    /**
+     * if we don't have any slaves that can do exec_pos reporting,
+     * prune by IO position as "plain old semi sync"
+     */
+    LogPosPtr ptr(log_file_name, log_file_pos);
+    active_tranxs_->prune_active_tranx_nodes(ptr, NULL);
+  }
+
   if (rpl_semi_sync_master_wait_sessions > 0)
   {
     /* Let us check if some of the waiting threads doing a trx
@@ -596,6 +807,15 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
     cond_broadcast();
   }
 
+  if (pruned_trx_list)
+  {
+    /**
+     * if we did prune trx list, it might be that we should wake up
+     * threads waiting for slave-lag to decrease
+     */
+    wake_slave_lag_waiters(oldest_tranx_commit_time_us);
+  }
+
   return function_exit(kWho, 0);
 }
 
@@ -743,14 +963,6 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
       }
     }
 
-    /*
-      At this point, the binlog file and position of this transaction
-      must have been removed from ActiveTranx.
-    */
-    assert(thd_killed(NULL) ||
-           !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
-                                             trx_wait_binlog_pos));
-    
   l_end:
     /* Update the status counter. */
     if (is_on())
@@ -794,7 +1006,7 @@ int ReplSemiSyncMaster::switch_off()
 
   /* Clear the active transaction list. */
   assert(active_tranxs_ != NULL);
-  result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
+  result = active_tranxs_->clear_active_tranx_nodes();
 
   rpl_semi_sync_master_off_times++;
   wait_file_name_inited_   = false;
@@ -884,7 +1096,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
 {
   const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
   int  cmp = 0;
-  bool sync = false;
+  int sync = 0;
 
   /* If the semi-sync master is not enabled, or the slave is not a semi-sync
    * target, do not request replies from the slave.
@@ -905,6 +1117,13 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
     /* semi-sync is ON */
     /* sync= false; No sync unless a transaction is involved. */
 
+    if (log_file_name == NULL)
+    {
+      /* this is heartbeat, request io_pos and exec_pos */
+      sync = checkSyncReq(0);
+      goto l_end;
+    }
+
     if (reply_file_name_inited_)
     {
       cmp = ActiveTranx::compare(log_file_name, log_file_pos,
@@ -933,12 +1152,12 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
      */
     if (cmp >= 0)
     {
-      /* 
+      /*
        * We only wait if the event is a transaction's ending event.
        */
       assert(active_tranxs_ != NULL);
-      sync = active_tranxs_->is_tranx_end_pos(log_file_name,
-                                               log_file_pos);
+      LogPosPtr pos(log_file_name, log_file_pos);
+      sync = checkSyncReq(&pos);
     }
   }
   else
@@ -951,7 +1170,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
     }
     else
     {
-      sync = true;
+      sync = 1;
     }
   }
 
@@ -966,10 +1185,14 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
   /* We do not need to clear sync flag because we set it to 0 when we
    * reserve the packet header.
    */
-  if (sync)
+  if (sync == 1)
   {
     (packet)[2] = kPacketFlagSync;
   }
+  else if (sync == 2)
+  {
+    (packet)[2] = kPacketFlagSyncAndReport;
+  }
 
   return function_exit(kWho, 0);
 }
@@ -1018,7 +1241,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
   if (is_on())
   {
     assert(active_tranxs_ != NULL);
-    if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
+    bool empty = active_tranxs_->is_empty();
+    if (active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
     {
       /*
         if insert tranx_node failed, print a warning message
@@ -1028,6 +1252,14 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
                         log_file_name, (ulong)log_file_pos);
       switch_off();
     }
+    else if (empty && rpl_semi_sync_master_slave_lag_clients > 0)
+    {
+      /* if the list of transactions was empty,
+       * we need to init the oldest_tranx_commit_time_us
+       */
+      oldest_unapplied_tranx_commit_time_us_ =
+          active_tranxs_->get_oldest_tranx_commit_time_us();
+    }
   }
 
  l_end:
@@ -1037,10 +1269,10 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
 }
 
 int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
-                                       const char *event_buf)
+                                       const char *event_buf_)
 {
   const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
-  const unsigned char *packet;
+  const unsigned char *packet, *packet_start;
   char     log_file_name[FN_REFLEN];
   my_off_t log_file_pos;
   ulong    log_file_len = 0;
@@ -1048,12 +1280,15 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
   int      result = -1;
   struct timespec start_ts;
   ulong trc_level = trace_level_;
+  const unsigned char *event_buf = (const unsigned char*)event_buf_;
+  bool exec_pos_present = false; // is SQL exec pos present in reply
+  LogPos   exec_pos;             // position of SQL thread
   LINT_INIT_STRUCT(start_ts);
 
   function_enter(kWho);
 
-  assert((unsigned char)event_buf[1] == kPacketMagicNum);
-  if ((unsigned char)event_buf[2] != kPacketFlagSync)
+  assert(event_buf[1] == kPacketMagicNum);
+  if ((event_buf[2] & (kPacketFlagSync | kPacketFlagSyncAndReport)) == 0)
   {
     /* current event does not require reply */
     result = 0;
@@ -1111,28 +1346,60 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
     goto l_end;
   }
 
-  packet = net->read_pos;
+  packet_start = packet = net->read_pos;
   if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
   {
     sql_print_error("Read semi-sync reply magic number error");
     goto l_end;
   }
 
+  /* we determine if this semisync ack contains a sql-thread exec-pos
+   * by checking if last byte == 0, since the packet then contains
+   * \0-terminated filenames */
+  exec_pos_present = packet[packet_len - 1] == 0;
+
   log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
-  log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
+  if (exec_pos_present == false)
+  {
+    log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
+  }
+  else
+  {
+    log_file_len = strnlen((char*)packet + REPLY_BINLOG_NAME_OFFSET,
+                           MY_MIN((ulong)FN_REFLEN,
+                                  packet_len - REPLY_BINLOG_NAME_OFFSET));
+  }
   if (log_file_len >= FN_REFLEN)
   {
     sql_print_error("Read semi-sync reply binlog file length too large");
     goto l_end;
   }
-  strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
+  packet+= REPLY_BINLOG_NAME_OFFSET;
+
+  strncpy(log_file_name, (const char*)packet, log_file_len);
   log_file_name[log_file_len] = 0;
 
+  if (exec_pos_present)
+  {
+    packet += log_file_len + 1;
+    if (packet + 8 + 1 >= (packet_start + packet_len))
+    {
+      sql_print_error("Read semi-sync reply binlog. "
+                      "Packet to short to contain exec-position!");
+      goto l_end;
+    }
+    exec_pos.file_pos = uint8korr(packet);
+    packet += 8;
+    strncpy(exec_pos.file_name, (char*)packet,
+            (packet_start + packet_len) - packet);
+  }
+
   if (trc_level & kTraceDetail)
     sql_print_information("%s: Got reply (%s, %lu)",
                           kWho, log_file_name, (ulong)log_file_pos);
 
-  result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
+  result = reportReplyBinlog(server_id, log_file_name, log_file_pos,
+                             exec_pos_present ? &exec_pos : NULL);
 
  l_end:
   return function_exit(kWho, result);
@@ -1154,6 +1421,16 @@ int ReplSemiSyncMaster::resetMaster()
   wait_file_name_inited_   = false;
   reply_file_name_inited_  = false;
   commit_file_name_inited_ = false;
+  if (active_tranxs_ != NULL)
+  {
+    /**
+     * make sure to empty transaction hash/list
+     * with slave-lag reporting this container does
+     * not have to be empty even if no transaction is
+     * currently running
+     */
+    active_tranxs_->clear_active_tranx_nodes();
+  }
 
   rpl_semi_sync_master_yes_transactions = 0;
   rpl_semi_sync_master_no_transactions = 0;
@@ -1168,6 +1445,13 @@ int ReplSemiSyncMaster::resetMaster()
 
   unlock();
 
+  mysql_mutex_lock(&LOCK_slave_lag_);
+  rpl_semi_sync_master_slave_lag_wait_sessions = 0;
+  oldest_unapplied_tranx_commit_time_us_ = 0;
+  rpl_semi_sync_master_trx_slave_lag_wait_num = 0;
+  rpl_semi_sync_master_trx_slave_lag_wait_time = 0;
+  mysql_mutex_unlock(&LOCK_slave_lag_);
+
   return function_exit(kWho, result);
 }
 
@@ -1186,6 +1470,29 @@ void ReplSemiSyncMaster::setExportStats()
                      ((double)rpl_semi_sync_master_net_wait_num)) : 0);
 
   unlock();
+
+  if (oldest_unapplied_tranx_commit_time_us_ != 0)
+  {
+    rpl_semi_sync_master_estimated_slave_lag = my_hrtime().val -
+        oldest_unapplied_tranx_commit_time_us_;
+  }
+  else
+  {
+    rpl_semi_sync_master_estimated_slave_lag = 0;
+  }
+
+  mysql_mutex_lock(&LOCK_slave_lag_);
+  if (rpl_semi_sync_master_trx_slave_lag_wait_num)
+  {
+    rpl_semi_sync_master_avg_trx_slave_lag_wait_time =
+        (unsigned long)((double)rpl_semi_sync_master_trx_slave_lag_wait_time /
+                        (double)rpl_semi_sync_master_trx_slave_lag_wait_num);
+  }
+  else
+  {
+    rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0;
+  }
+  mysql_mutex_unlock(&LOCK_slave_lag_);
 }
 
 /* Get the waiting time given the wait's staring time.
@@ -1213,3 +1520,117 @@ static int getWaitTime(const struct timespec& start_ts)
 
   return (int)(end_usecs - start_usecs);
 }
+
+void ReplSemiSyncMaster::wake_slave_lag_waiters(
+    ulonglong oldest_unapplied_tranx_commit_time_us)
+{
+  mysql_mutex_lock(&LOCK_slave_lag_);
+  oldest_unapplied_tranx_commit_time_us_ =
+      oldest_unapplied_tranx_commit_time_us;
+
+  if (rpl_semi_sync_master_slave_lag_wait_sessions > 0)
+  {
+    mysql_cond_broadcast(&COND_slave_lag_);
+  }
+  mysql_mutex_unlock(&LOCK_slave_lag_);
+}
+
+int ReplSemiSyncMaster::wait_slave_lag(ulong timeout_sec)
+{
+  int error = 0;
+  PSI_stage_info old_stage;
+
+  /* slave lag waiting not enabled, return directly */
+  if (rpl_semi_sync_master_max_slave_lag == 0)
+    return 0;
+
+  /* there is no slave that can report slave lag, return directly */
+  if (rpl_semi_sync_master_slave_lag_clients == 0)
+    return 0;
+
+  /* compute start_time and end_time */
+  struct timespec end_time;
+  set_timespec(end_time, 0);
+  ulonglong start_time_us = timespec_to_usec(&end_time);
+  end_time.tv_sec += timeout_sec;
+
+  mysql_mutex_lock(&LOCK_slave_lag_);
+
+  if (oldest_unapplied_tranx_commit_time_us_ == 0)
+  {
+    /* no slave lag, atleast one slave is up to date */
+    mysql_mutex_unlock(&LOCK_slave_lag_);
+    return 0;
+  }
+
+  if (rpl_semi_sync_master_max_slave_lag == 0)
+  {
+    /* slave lag waiting not enabled */
+    mysql_mutex_unlock(&LOCK_slave_lag_);
+    return 0;
+  }
+
+  /* This must be called after acquired the lock */
+  THD_ENTER_COND(NULL, &COND_slave_lag_, &LOCK_slave_lag_,
+                 &stage_waiting_for_semi_sync_slave_lag,
+                 &old_stage);
+
+  bool waited = false;
+  ulonglong lag = 0;
+  ulonglong max_lag = 0;
+  while (oldest_unapplied_tranx_commit_time_us_ != 0)
+  {
+    /* check kill_level after THD_ENTER_COND but *before* cond_wait
+     * to avoid missing kills */
+    if (! (getMasterEnabled() && is_on() &&
+           thd_kill_level(NULL) == THD_IS_NOT_KILLED))
+      break;
+
+    lag = start_time_us - oldest_unapplied_tranx_commit_time_us_;
+    max_lag = 1000000 * rpl_semi_sync_master_max_slave_lag;
+    if (lag <= max_lag)
+      break;
+
+    waited = true;
+    rpl_semi_sync_master_slave_lag_wait_sessions++;
+    int wait_result = mysql_cond_timedwait(&COND_slave_lag_, &LOCK_slave_lag_,
+                                           &end_time);
+    rpl_semi_sync_master_slave_lag_wait_sessions--;
+
+    bool thd_was_killed = thd_kill_level(NULL) != THD_IS_NOT_KILLED;
+    if (wait_result != 0 || thd_was_killed)
+    {
+      break;
+    }
+  }
+
+  if (thd_kill_level(NULL) != THD_IS_NOT_KILLED)
+  {
+    /* Return error to client. */
+    error = 1;
+    my_printf_error(ER_ERROR_DURING_COMMIT,
+                    "Killed while waiting for replication semi-sync slave-lag.",
+                    MYF(0));
+  }
+  else if (lag > max_lag)
+  {
+    /* Return error to client. */
+    error = 1;
+    my_printf_error(ER_ERROR_DURING_COMMIT,
+                    "Slave-lag timeout",
+                    MYF(0));
+  }
+
+  if (waited)
+  {
+    rpl_semi_sync_master_trx_slave_lag_wait_num++;
+    rpl_semi_sync_master_trx_slave_lag_wait_time +=
+        (my_hrtime().val - start_time_us);
+  }
+
+  /* The lock held will be released by thd_exit_cond, so no need to
+     call unlock() here */
+  THD_EXIT_COND(NULL, & old_stage);
+
+  return error;
+}
diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h
index d9dc4ce..e5c0de6 100644
--- a/plugin/semisync/semisync_master.h
+++ b/plugin/semisync/semisync_master.h
@@ -24,17 +24,101 @@
 #ifdef HAVE_PSI_INTERFACE
 extern PSI_mutex_key key_ss_mutex_LOCK_binlog_;
 extern PSI_cond_key key_ss_cond_COND_binlog_send_;
+
+extern PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
+extern PSI_cond_key key_ss_cond_COND_slave_lag_;
 #endif
 
 extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
+extern PSI_stage_info stage_waiting_for_semi_sync_slave_lag;
 
 struct TranxNode {
   char             log_name_[FN_REFLEN];
-  my_off_t          log_pos_;
+  my_off_t         log_pos_;
+  ulonglong        tranx_commit_time_us;
   struct TranxNode *next_;            /* the next node in the sorted list */
   struct TranxNode *hash_next_;    /* the next node during hash collision */
 };
 
+struct LogPos;
+
+/* This represent a log position */
+struct LogPosPtr {
+  LogPosPtr() { Uninit();}
+  LogPosPtr(const char *name, my_off_t pos) : file_name(name), file_pos(pos){}
+  explicit LogPosPtr(const LogPos& pos) { Assign(&pos); }
+
+  const char *file_name;
+  my_off_t    file_pos;
+
+  LogPosPtr& Assign(const LogPosPtr *src) {
+    file_name = src->file_name;
+    file_pos = src->file_pos;
+    return *this;
+  }
+
+  LogPosPtr& Assign(const LogPos *src);
+
+  void Uninit() { file_name = NULL;}
+  bool IsInited() const { return file_name != NULL; }
+};
+
+struct LogPos {
+  char     file_name[FN_REFLEN];
+  my_off_t file_pos;
+
+  LogPos() { Uninit(); }
+
+  LogPosPtr ToLogPosPtr() const {
+    if (IsInited()){
+      LogPosPtr p(file_name, file_pos);
+      return p;
+    } else {
+      LogPosPtr p;
+      return p;
+    }
+  }
+
+  LogPos& Assign(const LogPosPtr *src) {
+    if (src->IsInited()) {
+      strcpy(file_name, src->file_name);
+      file_pos = src->file_pos;
+    } else {
+      Uninit();
+    }
+    return *this;
+  }
+
+  LogPos& Assign(const LogPos* src) {
+    LogPosPtr p = src->ToLogPosPtr();
+    Assign(&p);
+    return *this;
+  }
+
+  void Uninit() { file_name[0] = 0; }
+  bool IsInited() const { return file_name[0] != 0; }
+};
+
+inline LogPosPtr& LogPosPtr::Assign(const LogPos* src) {
+  LogPosPtr p = src->ToLogPosPtr();
+  Assign(&p);
+  return *this;
+}
+
+inline int CompareLogPos(const LogPosPtr *pos1, const LogPosPtr *pos2) {
+  int cmp = strcmp(pos1->file_name, pos2->file_name);
+
+  if (cmp != 0)
+    return cmp;
+
+  if (pos1->file_pos > pos2->file_pos)
+    return 1;
+  else if (pos1->file_pos < pos2->file_pos)
+    return -1;
+  else
+    return 0;
+}
+
 /**
   @class TranxNodeAllocator
 
@@ -329,10 +413,14 @@ class ActiveTranx
                    node2->log_name_, node2->log_pos_);
   }
 
+  void set_new_front(TranxNode* new_front);
+
 public:
   ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level);
   ~ActiveTranx();
 
+  bool is_empty() const { return trx_front_ == NULL; }
+
   /* Insert an active transaction node with the specified position.
    *
    * Return:
@@ -340,21 +428,42 @@ class ActiveTranx
    */
   int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
 
-  /* Clear the active transaction nodes until(inclusive) the specified
-   * position.
-   * If log_file_name is NULL, everything will be cleared: the sorted
+  /* Clear the active transaction
+   * Everything will be cleared: the sorted
    * list and the hash table will be reset to empty.
-   * 
+   *
    * Return:
-   *  0: success;  non-zero: error
+   *   0 success; non-zero: error
+   */
+  int clear_active_tranx_nodes();
+
+  /* Prune the active transaction nodes until the specified
+   * position (inclusive).
+   *
+   * Return:
+   *   true  if any transaction was removed
+   *   false if list was left unchanged
    */
-  int clear_active_tranx_nodes(const char *log_file_name,
-                               my_off_t    log_file_pos);
+  bool prune_active_tranx_nodes(LogPosPtr logpos,
+                                ulonglong *oldest_tranx_commit_time_us);
 
-  /* Given a position, check to see whether the position is an active
-   * transaction's ending position by probing the hash table.
+  /* Lookup a transaction's ending position by probing the hash table.
+   *
+   * return  entry if found or NULL otherwise
    */
-  bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
+  TranxNode* lookup_tranx_end_pos(const char *log_file_name,
+                                  my_off_t log_file_pos);
+
+
+  /* Check if an entry is rear (i.e last) */
+  bool is_rear(TranxNode* entry) const { return entry == trx_rear_; }
+
+  /**
+   * return timestamp of oldest transaction in list
+   */
+  ulonglong get_oldest_tranx_commit_time_us() const {
+    return trx_front_->tranx_commit_time_us;
+  }
 
   /* Given two binlog positions, compare which one is bigger based on
    * (file_name, file_position).
@@ -365,6 +474,24 @@ class ActiveTranx
 };
 
 /**
+ * State that semisync master keeps per slave
+ */
+struct ReplSemiSyncMasterPerSlaveState
+{
+  ReplSemiSyncMasterPerSlaveState() : unacked_event_count_(0) {}
+
+  /**
+   * No of events that has not been semi-sync acked
+   */
+  unsigned unacked_event_count_;
+
+  /**
+   * Position of last event that was semisync'ed
+   */
+  LogPos sync_req_pos_;
+};
+
+/**
    The extension class for the master of semi-synchronous replication
 */
 class ReplSemiSyncMaster
@@ -432,6 +559,16 @@ class ReplSemiSyncMaster
 
   bool            state_;                    /* whether semi-sync is switched */
 
+  /* This cond variable is signaled when slave lag has decreased */
+  mysql_cond_t COND_slave_lag_;
+
+  /* Mutex that protects oldest_unapplied_tranx_commit_time_us */
+  mysql_mutex_t LOCK_slave_lag_;
+
+  /* this is commit time of oldest transaction that has not been applied
+   * on any slave */
+  ulonglong oldest_unapplied_tranx_commit_time_us_;
+
   void lock();
   void unlock();
   void cond_broadcast();
@@ -493,6 +630,9 @@ class ReplSemiSyncMaster
   /* Is the slave servered by the thread requested semi-sync */
   bool is_semi_sync_slave();
 
+  /* Does this slave have slave lag reporting capabilities */
+  bool has_semi_sync_slave_lag();
+
   /* In semi-sync replication, reports up to which binlog position we have
    * received replies from the slave indicating that it already get the events.
    *
@@ -501,13 +641,15 @@ class ReplSemiSyncMaster
    *  log_file_name - (IN)  binlog file name
    *  end_offset    - (IN)  the offset in the binlog file up to which we have
    *                        the replies from the slave
+   *  exec_position - (IN)  position of SQL thread or NULL if not present
    *
    * Return:
    *  0: success;  non-zero: error
    */
   int reportReplyBinlog(uint32 server_id,
                         const char* log_file_name,
-                        my_off_t end_offset);
+                        my_off_t end_offset,
+                        const LogPos *exec_position);
 
   /* Commit a transaction in the final step.  This function is called from
    * InnoDB before returning from the low commit.  If semi-sync is switch on,
@@ -540,6 +682,16 @@ class ReplSemiSyncMaster
    */
   int reserveSyncHeader(unsigned char *header, unsigned long size);
 
+  /*
+   * check if an event should be semi synced and optionally
+   * if it should report back position of SQL thread on slave
+   *
+   * return 0 - no semi sync
+   *        1 - semi sync
+   *        2 - semi sync and report exec position
+   */
+  int checkSyncReq(const LogPosPtr *log_pos);
+
   /* Update the sync bit in the packet header to indicate to the slave whether
    * the master will wait for the reply of the event.  If semi-sync is switched
    * off and we detect that the slave is catching up, we switch semi-sync on.
@@ -592,6 +744,21 @@ class ReplSemiSyncMaster
    * go off for that.
    */
   int resetMaster();
+
+  /**
+   * wake potential slave-lag waiters
+   *   called by binlog dump-thread(s)
+   */
+  void wake_slave_lag_waiters(ulonglong oldest_unapplied_tranx_commit_time_us);
+
+  /**
+   * wait for slave lag to get below threshold
+   *   called by user-thread(s)
+   *
+   * return 0 - success
+   *        1 - timeout
+   */
+  int wait_slave_lag(ulong max_wait_time_sec);
 };
 
 enum rpl_semi_sync_master_wait_point_t {
@@ -621,6 +788,17 @@ extern unsigned long long rpl_semi_sync_master_trx_wait_num;
 extern unsigned long long rpl_semi_sync_master_net_wait_time;
 extern unsigned long long rpl_semi_sync_master_trx_wait_time;
 
+extern unsigned long rpl_semi_sync_master_max_unacked_event_count;
+extern unsigned long rpl_semi_sync_master_max_unacked_event_bytes;
+extern unsigned long rpl_semi_sync_master_max_slave_lag;
+extern unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us;
+extern unsigned long rpl_semi_sync_master_slave_lag_wait_sessions;
+extern unsigned long long rpl_semi_sync_master_estimated_slave_lag;
+
+extern unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time;
+extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num;
+extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time;
+
 /*
   This indicates whether we should keep waiting if no semi-sync slave
   is available.
diff --git a/plugin/semisync/semisync_master_plugin.cc b/plugin/semisync/semisync_master_plugin.cc
index 7bb0eea..3282158 100644
--- a/plugin/semisync/semisync_master_plugin.cc
+++ b/plugin/semisync/semisync_master_plugin.cc
@@ -21,6 +21,11 @@
 
 static ReplSemiSyncMaster repl_semisync;
 
+// forward declaration
+static inline ulong get_slave_lag_wait_timeout(THD* thd);
+
+static char rpl_semi_sync_master_group_commit = 0;
+
 C_MODE_START
 
 int repl_semi_report_binlog_update(Binlog_storage_param *param,
@@ -31,6 +36,13 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
 
   if (repl_semisync.getMasterEnabled())
   {
+    if (rpl_semi_sync_master_group_commit &&
+        ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
+    {
+      /** there are transactions more coming... */
+      return 0;
+    }
+
     /*
       Let us store the binlog file name and the position, so that
       we know how long to wait for the binlog to the replicated to
@@ -43,8 +55,11 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
   return error;
 }
 
-int repl_semi_request_commit(Trans_param *param)
+int repl_semi_before_commit(Trans_param *param, int *error)
 {
+  *error = repl_semisync.wait_slave_lag(
+      get_slave_lag_wait_timeout(current_thd));
+
   return 0;
 }
 
@@ -53,6 +68,14 @@ int repl_semi_report_binlog_sync(Binlog_storage_param *param,
                                  my_off_t log_pos, uint32 flags)
 {
   int error= 0;
+
+  if (rpl_semi_sync_master_group_commit &&
+      ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
+  {
+    /** there are transactions more coming... */
+    return 0;
+  }
+
   if (rpl_semi_sync_master_wait_point ==
       SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
   {
@@ -100,7 +123,7 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
       Let's assume this semi-sync slave has already received all
       binlog events before the filename and position it requests.
     */
-    repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
+    repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos, NULL);
   }
   sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
 			semi_sync_slave ? "semi-sync" : "asynchronous",
@@ -242,15 +265,72 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
   &fix_rpl_semi_sync_master_trace_level, // update
   32, 0, ~0UL, 1);
 
+static MYSQL_SYSVAR_ULONG(max_unacked_event_count,
+  rpl_semi_sync_master_max_unacked_event_count,
+  PLUGIN_VAR_OPCMDARG,
+  "Maximum unacked replication events",
+  NULL, // check
+  NULL, // update
+  rpl_semi_sync_master_max_unacked_event_count, 0, ~0UL, 1);
+
+static MYSQL_SYSVAR_ULONG(max_unacked_event_bytes,
+  rpl_semi_sync_master_max_unacked_event_bytes,
+  PLUGIN_VAR_OPCMDARG,
+  "Maximum unacked replication bytes",
+  NULL, // check
+  NULL, // update
+  rpl_semi_sync_master_max_unacked_event_bytes, 0, ~0UL, 1);
+
+static MYSQL_SYSVAR_ULONG(max_slave_lag, rpl_semi_sync_master_max_slave_lag,
+  PLUGIN_VAR_OPCMDARG,
+  "Maximum allowed lag of fastest semi-sync slave (in seconds), "
+  "checked before commit.",
+  NULL, // check
+  NULL, // update
+  rpl_semi_sync_master_max_slave_lag, 0, ~0UL, 1);
+
+static MYSQL_THDVAR_ULONG(slave_lag_wait_timeout,
+  PLUGIN_VAR_RQCMDARG,
+  "Timeout in seconds a rw-transaction may wait for max slave lag before "
+  "being rolled back.",
+  NULL, NULL, 50, 1, 1024 * 1024 * 1024, 0);
+
+static MYSQL_SYSVAR_ULONG(
+    slave_lag_heartbeat_frequency_us,
+    rpl_semi_sync_master_slave_lag_heartbeat_frequency_us,
+    PLUGIN_VAR_RQCMDARG,
+    "Heartbeat frequency when slave-lag is enabled (in microseconds).",
+    NULL, // check
+    NULL, // update
+    500000, /* 500 ms */
+    1, ~0UL, 1);
+
+static MYSQL_SYSVAR_BOOL(group_commit, rpl_semi_sync_master_group_commit,
+  PLUGIN_VAR_OPCMDARG,
+ "Group commit for semi sync",
+  NULL, 			// check
+  NULL,
+  0);
+
 static SYS_VAR* semi_sync_master_system_vars[]= {
   MYSQL_SYSVAR(enabled),
   MYSQL_SYSVAR(wait_point),
   MYSQL_SYSVAR(timeout),
   MYSQL_SYSVAR(wait_no_slave),
   MYSQL_SYSVAR(trace_level),
+  MYSQL_SYSVAR(max_unacked_event_count),
+  MYSQL_SYSVAR(max_unacked_event_bytes),
+  MYSQL_SYSVAR(max_slave_lag),
+  MYSQL_SYSVAR(slave_lag_wait_timeout),
+  MYSQL_SYSVAR(slave_lag_heartbeat_frequency_us),
+  MYSQL_SYSVAR(group_commit),
   NULL,
 };
 
+static inline ulong get_slave_lag_wait_timeout(THD* thd)
+{
+  return THDVAR(thd, slave_lag_wait_timeout);
+}
 
 static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
 				      SYS_VAR *var,
@@ -297,6 +377,7 @@ Trans_observer trans_observer = {
 
   repl_semi_report_commit,	// after_commit
   repl_semi_report_rollback,	// after_rollback
+  repl_semi_before_commit,	// before commit
 };
 
 Binlog_storage_observer storage_observer = {
@@ -339,7 +420,11 @@ DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
 DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
 DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
 DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
-
+DEF_SHOW_FUNC(slave_lag_wait_sessions, SHOW_LONG)
+DEF_SHOW_FUNC(estimated_slave_lag, SHOW_LONGLONG)
+DEF_SHOW_FUNC(trx_slave_lag_wait_time, SHOW_LONGLONG)
+DEF_SHOW_FUNC(trx_slave_lag_wait_num, SHOW_LONGLONG)
+DEF_SHOW_FUNC(avg_trx_slave_lag_wait_time, SHOW_LONG)
 
 /* plugin status variables */
 static SHOW_VAR semi_sync_master_status_vars[]= {
@@ -385,32 +470,55 @@ static SHOW_VAR semi_sync_master_status_vars[]= {
   {"Rpl_semi_sync_master_net_avg_wait_time",
    (char*) &SHOW_FNAME(avg_net_wait_time),
    SHOW_SIMPLE_FUNC},
+  {"Rpl_semi_sync_master_slave_lag_wait_sessions",
+   (char*) &SHOW_FNAME(slave_lag_wait_sessions),
+   SHOW_SIMPLE_FUNC},
+  {"Rpl_semi_sync_master_estimated_slave_lag",
+   (char*) &SHOW_FNAME(estimated_slave_lag),
+   SHOW_SIMPLE_FUNC},
+  {"Rpl_semi_sync_master_tx_slave_lag_wait_time",
+   (char*) &SHOW_FNAME(trx_slave_lag_wait_time),
+   SHOW_SIMPLE_FUNC},
+  {"Rpl_semi_sync_master_tx_slave_lag_waits",
+   (char*) &SHOW_FNAME(trx_slave_lag_wait_num),
+   SHOW_SIMPLE_FUNC},
+  {"Rpl_semi_sync_master_tx_avg_slave_lag_wait_time",
+   (char*) &SHOW_FNAME(avg_trx_slave_lag_wait_time),
+   SHOW_SIMPLE_FUNC},
   {NULL, NULL, SHOW_LONG},
 };
 
 #ifdef HAVE_PSI_INTERFACE
 PSI_mutex_key key_ss_mutex_LOCK_binlog_;
+PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
 
 static PSI_mutex_info all_semisync_mutexes[]=
 {
-  { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0}
+  { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0 },
+  { &key_ss_mutex_LOCK_slave_lag_, "LOCK_slave_lag_", 0 }
 };
 
 PSI_cond_key key_ss_cond_COND_binlog_send_;
+PSI_cond_key key_ss_cond_COND_slave_lag_;
 
 static PSI_cond_info all_semisync_conds[]=
 {
-  { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0}
+  { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0 },
+  { &key_ss_cond_COND_slave_lag_, "COND_slave_lag_", 0 }
 };
 #endif /* HAVE_PSI_INTERFACE */
 
 PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
 { 0, "Waiting for semi-sync ACK from slave", 0};
 
+PSI_stage_info stage_waiting_for_semi_sync_slave_lag=
+{ 0, "Waiting for semi-sync slave lag", 0};
+
 #ifdef HAVE_PSI_INTERFACE
 PSI_stage_info *all_semisync_stages[]=
 {
-  & stage_waiting_for_semi_sync_ack_from_slave
+  & stage_waiting_for_semi_sync_ack_from_slave,
+  & stage_waiting_for_semi_sync_slave_lag
 };
 
 static void init_semisync_psi_keys(void)
@@ -492,4 +600,3 @@ maria_declare_plugin(semisync_master)
   MariaDB_PLUGIN_MATURITY_GAMMA
 }
 maria_declare_plugin_end;
-
diff --git a/plugin/semisync/semisync_slave.cc b/plugin/semisync/semisync_slave.cc
index 5f98472..839e0cc 100644
--- a/plugin/semisync/semisync_slave.cc
+++ b/plugin/semisync/semisync_slave.cc
@@ -20,6 +20,7 @@
 char rpl_semi_sync_slave_enabled;
 char rpl_semi_sync_slave_status= 0;
 unsigned long rpl_semi_sync_slave_trace_level;
+char rpl_semi_sync_slave_lag_enabled= 0;
 
 int ReplSemiSyncSlave::initObject()
 {
@@ -42,7 +43,7 @@ int ReplSemiSyncSlave::initObject()
 
 int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
                                       unsigned long total_len,
-                                      bool  *need_reply,
+                                      unsigned char *need_reply,
                                       const char **payload,
                                       unsigned long *payload_len)
 {
@@ -52,7 +53,7 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
 
   if ((unsigned char)(header[0]) == kPacketMagicNum)
   {
-    *need_reply  = (header[1] & kPacketFlagSync);
+    *need_reply  = (header[1] & (kPacketFlagSync | kPacketFlagSyncAndReport));
     *payload_len = total_len - 2;
     *payload     = header + 2;
 
@@ -95,16 +96,20 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
   return 0;
 }
 
-int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
-                                 const char *binlog_filename,
-                                 my_off_t binlog_filepos)
+int ReplSemiSyncSlave::slaveReply(unsigned char header_byte,
+                                  MYSQL *mysql,
+                                  const char *binlog_filename,
+                                  my_off_t binlog_filepos,
+                                  Master_info * mi)
 {
   const char *kWho = "ReplSemiSyncSlave::slaveReply";
   NET *net= &mysql->net;
-  uchar reply_buffer[REPLY_MAGIC_NUM_LEN
-                     + REPLY_BINLOG_POS_LEN
-                     + REPLY_BINLOG_NAME_LEN];
+  uchar reply_buffer[REPLY_MAGIC_NUM_LEN +
+                     2 * ( REPLY_BINLOG_POS_LEN +
+                           REPLY_BINLOG_NAME_LEN +
+                           /* '\0' */ 1) ];
   int  reply_res, name_len = strlen(binlog_filename);
+  int  msg_len = name_len + REPLY_BINLOG_NAME_OFFSET;
 
   function_enter(kWho);
 
@@ -119,10 +124,29 @@ int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
     sql_print_information("%s: reply (%s, %lu)", kWho,
                           binlog_filename, (ulong)binlog_filepos);
 
+  if (header_byte & kPacketFlagSyncAndReport)
+  {
+    /**
+     * master requests that we also report back SQL-thread position
+     */
+
+    // where to store sql filename/position
+    char *bufptr = (char*)reply_buffer + msg_len;
+    bufptr[0] = 0; // '\0' terminate previous filename
+    bufptr++;
+
+    my_off_t sql_file_pos;
+    // get file/position and store the filename directly info bufptr+8
+    size_t name_len2 = get_master_log_pos(mi, bufptr + 8, &sql_file_pos);
+    int8store(bufptr, sql_file_pos); // store position
+
+    msg_len += /* '\0' */ 1 + /* position */ 8 + name_len2 + /* '\0' */ 1;
+  }
+
   net_clear(net, 0);
   /* Send the reply. */
-  reply_res = my_net_write(net, reply_buffer,
-                           name_len + REPLY_BINLOG_NAME_OFFSET);
+  reply_res = my_net_write(net, reply_buffer, msg_len);
+
   if (!reply_res)
   {
     reply_res = net_flush(net);
diff --git a/plugin/semisync/semisync_slave.h b/plugin/semisync/semisync_slave.h
index 1bf8cf3..c91847d 100644
--- a/plugin/semisync/semisync_slave.h
+++ b/plugin/semisync/semisync_slave.h
@@ -60,23 +60,30 @@ class ReplSemiSyncSlave
    * Return:
    *  0: success;  non-zero: error
    */
-  int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
+  int slaveReadSyncHeader(const char *header, unsigned long total_len,
+                          unsigned char *need_reply_byte,
                           const char **payload, unsigned long *payload_len);
 
   /* A slave replies to the master indicating its replication process.  It
    * indicates that the slave has received all events before the specified
    * binlog position.
-   * 
+   *
    * Input:
+   *  need_reply_byte  - (IN)  the header byte
    *  mysql            - (IN)  the mysql network connection
    *  binlog_filename  - (IN)  the reply point's binlog file name
    *  binlog_filepos   - (IN)  the reply point's binlog file offset
+   *  master_info      - (IN)  the master info struct so that we can get more
+   *                           info if needed
    *
    * Return:
    *  0: success;  non-zero: error
    */
-  int slaveReply(MYSQL *mysql, const char *binlog_filename,
-                 my_off_t binlog_filepos);
+  int slaveReply(unsigned char need_reply_byte,
+                 MYSQL *mysql,
+                 const char *binlog_filename,
+                 my_off_t binlog_filepos,
+                 Master_info* master_info);
 
   int slaveStart(Binlog_relay_IO_param *param);
   int slaveStop(Binlog_relay_IO_param *param);
@@ -93,5 +100,6 @@ class ReplSemiSyncSlave
 extern char rpl_semi_sync_slave_enabled;
 extern unsigned long rpl_semi_sync_slave_trace_level;
 extern char rpl_semi_sync_slave_status;
+extern char rpl_semi_sync_slave_lag_enabled;
 
 #endif /* SEMISYNC_SLAVE_H */
diff --git a/plugin/semisync/semisync_slave_plugin.cc b/plugin/semisync/semisync_slave_plugin.cc
index 572ead2..0bf03be 100644
--- a/plugin/semisync/semisync_slave_plugin.cc
+++ b/plugin/semisync/semisync_slave_plugin.cc
@@ -28,7 +28,7 @@ static ReplSemiSyncSlave repl_semisync;
   event read is the last event of a transaction. And the value is
   checked in repl_semi_slave_queue_event.
 */
-bool semi_sync_need_reply= false;
+unsigned char semi_sync_need_reply= 0;
 
 C_MODE_START
 
@@ -81,6 +81,23 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
     return 1;
   }
   mysql_free_result(mysql_store_result(mysql));
+
+  if (rpl_semi_sync_slave_lag_enabled)
+  {
+    char buf[100];
+    /*
+      Tell master that we can do exec-position reporting
+    */
+    snprintf(buf, sizeof(buf), "SET @%s= 1",
+             ReplSemiSyncBase::kRplSemiSyncSlaveReportExec);
+    if (mysql_real_query(mysql, buf, strlen(buf)))
+    {
+      sql_print_error("query: %s on master failed", buf);
+      return 1;
+    }
+    mysql_free_result(mysql_store_result(mysql));
+  }
+
   rpl_semi_sync_slave_status= 1;
   return 0;
 }
@@ -110,9 +127,11 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
       should not cause the slave IO thread to stop, and the error
       messages are already reported.
     */
-    (void) repl_semisync.slaveReply(param->mysql,
+    (void) repl_semisync.slaveReply(semi_sync_need_reply,
+                                    param->mysql,
                                     param->master_log_name,
-                                    param->master_log_pos);
+                                    param->master_log_pos,
+                                    param->mi);
   }
   return 0;
 }
@@ -164,9 +183,17 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level,
   &fix_rpl_semi_sync_trace_level, // update
   32, 0, ~0UL, 1);
 
+static MYSQL_SYSVAR_BOOL(lag_enabled, rpl_semi_sync_slave_lag_enabled,
+  PLUGIN_VAR_OPCMDARG,
+  "Enable semi-synchronous replication slave lag reporting. ",
+  NULL, // check
+  NULL, // update
+  0);
+
 static SYS_VAR* semi_sync_slave_system_vars[]= {
   MYSQL_SYSVAR(enabled),
   MYSQL_SYSVAR(trace_level),
+  MYSQL_SYSVAR(lag_enabled),
   NULL,
 };
 
@@ -230,4 +257,3 @@ maria_declare_plugin(semisync_slave)
   MariaDB_PLUGIN_MATURITY_GAMMA
 }
 maria_declare_plugin_end;
-
diff --git a/sql/handler.cc b/sql/handler.cc
index 3ca9ec3..3e6cd65 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1364,12 +1364,30 @@ int ha_commit_trans(THD *thd, bool all)
   uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all);
   /* rw_trans is TRUE when we in a transaction changing data */
   bool rw_trans= is_real_trans && (rw_ha_count > 0);
+  bool mdl_request_initialized= false;
   MDL_request mdl_request;
   DBUG_PRINT("info", ("is_real_trans: %d  rw_trans:  %d  rw_ha_count: %d",
                       is_real_trans, rw_trans, rw_ha_count));
 
   if (rw_trans)
   {
+    /* check READ-ONLY just before before_commit hook to decrease likelihood
+     * of having threads hanging waiting for slave-lag only to be aborted
+     * due to read-only.
+     */
+    if (opt_readonly &&
+        !(thd->security_ctx->master_access & SUPER_ACL) &&
+        !thd->slave_thread)
+    {
+      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
+      goto err;
+    }
+
+    if (RUN_HOOK(transaction, before_commit, (thd)))
+    {
+      goto err;
+    }
+
     /*
       Acquire a metadata lock which will ensure that COMMIT is blocked
       by an active FLUSH TABLES WITH READ LOCK (and vice versa:
@@ -1378,6 +1396,7 @@ int ha_commit_trans(THD *thd, bool all)
       We allow the owner of FTWRL to COMMIT; we assume that it knows
       what it does.
     */
+    mdl_request_initialized= true;
     mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
                      MDL_EXPLICIT);
 
@@ -1486,7 +1505,7 @@ int ha_commit_trans(THD *thd, bool all)
     ha_rollback_trans(thd, all);
 
 end:
-  if (rw_trans && mdl_request.ticket)
+  if (rw_trans && mdl_request_initialized && mdl_request.ticket)
   {
     /*
       We do not always immediately release transactional locks
diff --git a/sql/replication.h b/sql/replication.h
index 4731c22..309bdb4 100644
--- a/sql/replication.h
+++ b/sql/replication.h
@@ -110,6 +110,21 @@ typedef struct Trans_observer {
      @retval 1 Failure
   */
   int (*after_rollback)(Trans_param *param);
+
+  /**
+     This callback is called before transaction commit
+     If function does not return *error == 0 transaction will
+     not be committed but error code will be returned to client
+
+     @note *error!=0 and return code 0 shall be used by plugin to signal
+     that transaction should be aborted.
+     If returning non-zero transaction will also be aborted and an error
+     will be printed to error log.
+
+     @retval 0 Sucess
+     @retval non-zero error
+  */
+  int (*before_commit)(Trans_param *param, int *error);
 } Trans_observer;
 
 /**
@@ -294,6 +309,8 @@ enum Binlog_relay_IO_flags {
 };
 
 
+class Master_info;
+
 /**
   Replication binlog relay IO observer parameter
 */
@@ -309,8 +326,20 @@ typedef struct Binlog_relay_IO_param {
   my_off_t master_log_pos;
 
   MYSQL *mysql;                        /* the connection to master */
+
+  Master_info * mi;                    /* master info handle */
 } Binlog_relay_IO_param;
 
+
+/* get the master log given a Master_info
+ * and store it in filename_buf/filepos
+ * return length of filename (excluding \0)
+ *
+ * note: filename_buf should be a minimum FN_REFLEN
+ */
+size_t get_master_log_pos(const Master_info *mi,
+                          char *filename_buf, my_off_t *filepos);
+
 /**
    Observes and extends the service of slave IO thread.
 */
@@ -561,7 +590,21 @@ int get_user_var_str(const char *name,
                      char *value, unsigned long len,
                      unsigned int precision, int *null_value);
 
-  
+
+/**
+   Set or replace the value of user variable as to an ulonglong
+
+   @param name      user variable name
+   @param value     the value
+   @param old_value pointer to where old value will be stored (or NULL)
+
+   @retval  0 Success, no prior value found
+   @retval  1 Success, old_value populated
+   @retval -1 Fail
+*/
+int set_user_var_int(const char *name,
+                     long long int value,
+                     long long int *old_value);
 
 #ifdef __cplusplus
 }
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
index 3962600..209a809 100644
--- a/sql/rpl_handler.cc
+++ b/sql/rpl_handler.cc
@@ -23,6 +23,7 @@
 #include "rpl_filter.h"
 #include <my_dir.h>
 #include "rpl_handler.h"
+#include "sql_prepare.h"
 
 Trans_delegate *transaction_delegate;
 Binlog_storage_delegate *binlog_storage_delegate;
@@ -88,6 +89,42 @@ int get_user_var_str(const char *name, char *value,
   return 0;
 }
 
+int set_user_var_int(const char *name,
+                     long long int value,
+                     long long int *old_value)
+{
+  THD* thd= current_thd;
+  bool null_val;
+  user_var_entry *entry=
+      (user_var_entry*) my_hash_search(&thd->user_vars,
+                                       (uchar*) name, strlen(name));
+  if (entry != NULL)
+  {
+    if (old_value != NULL)
+      *old_value= entry->val_int(&null_val);
+  }
+
+  Ed_connection con(thd);
+
+  char buf[256];
+  int res= snprintf(buf, sizeof(buf), "SET @%s=%lld", name, value);
+  if (/* error */ res < 0 ||
+      /* truncated */ res >= sizeof(buf))
+  {
+    return -1;
+  }
+
+  LEX_STRING str;
+  lex_string_set(&str, buf);
+
+  if (con.execute_direct(str))
+  {
+    return -1;
+  }
+
+  return entry == NULL ? 0 : 1;
+}
+
 int delegates_init()
 {
   static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
@@ -249,6 +286,17 @@ int Trans_delegate::after_rollback(THD *thd, bool all)
   return ret;
 }
 
+int Trans_delegate::before_commit(THD *thd)
+{
+  int ret= 0, error= 0;
+  Trans_param param;
+  param.flags= 0;
+  param.log_file= 0;
+  param.log_pos= 0;
+  FOREACH_OBSERVER(ret, before_commit, thd, (&param, &error));
+  return error;
+}
+
 int Binlog_storage_delegate::after_flush(THD *thd,
                                          const char *log_file,
                                          my_off_t log_pos,
@@ -374,17 +422,19 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
 
 int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
                                                 String *packet,
-                                                const char *log_file,
+                                                const char *log_file_path,
                                                 my_off_t log_pos)
 {
   Binlog_transmit_param param;
   param.flags= flags;
 
   int ret= 0;
+  const char* log_file_name= log_file_path != NULL ?
+      log_file_path + dirname_length(log_file_path) : NULL;
   FOREACH_OBSERVER(ret, before_send_event, false,
                    (&param, (uchar *)packet->c_ptr(),
                     packet->length(),
-                    log_file+dirname_length(log_file), log_pos));
+                    log_file_name, log_pos));
   return ret;
 }
 
@@ -414,6 +464,7 @@ int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
 void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
                                           Master_info *mi)
 {
+  param->mi = mi;
   param->mysql= mi->mysql;
   param->user= mi->user;
   param->host= mi->host;
@@ -540,6 +591,20 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
 {
   return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
 }
+
+/* get master log pos for a Master_info struct */
+size_t get_master_log_pos(const Master_info* mi,
+                          char *filename_buf, my_off_t *filepos)
+{
+  mysql_mutex_t *mutex= &mi->rli.data_lock;
+
+  mysql_mutex_lock(mutex);
+  *filepos= mi->rli.group_master_log_pos;
+  strncpy(filename_buf, mi->rli.group_master_log_name, FN_REFLEN);
+  mysql_mutex_unlock(mutex);
+  return strnlen(filename_buf, FN_REFLEN);
+}
+
 #else
 int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
 {
@@ -560,4 +625,13 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
 {
   return 0;
 }
+
+size_t get_master_log_pos(const Master_info* mi,
+                          char *filename_buf, my_off_t *filepos)
+{
+  *filepos= 0;
+  filename_buf[0]= 0;
+  return 0;
+}
+
 #endif /* HAVE_REPLICATION */
diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h
index afcfd9d..5119ee4 100644
--- a/sql/rpl_handler.h
+++ b/sql/rpl_handler.h
@@ -142,7 +142,7 @@ class Trans_delegate
   :public Delegate {
 public:
   typedef Trans_observer Observer;
-  int before_commit(THD *thd, bool all);
+  int before_commit(THD *thd);
   int before_rollback(THD *thd, bool all);
   int after_commit(THD *thd, bool all);
   int after_rollback(THD *thd, bool all);
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 56300c6..d60a122 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -154,7 +154,9 @@ class Relay_log_info : public Slave_reporting_capability
     standard lock acquisition order to avoid deadlocks:
     run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
   */
-  mysql_mutex_t data_lock, run_lock;
+  mutable mysql_mutex_t data_lock;
+  mysql_mutex_t run_lock;
+
   /*
     start_cond is broadcast when SQL thread is started
     stop_cond - when stopped
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index d9ae6ca..9662058 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -818,6 +818,13 @@ static int send_heartbeat_event(binlog_send_info *info,
     packet->append(b, sizeof(b));
   }
 
+  if (RUN_HOOK(binlog_transmit, before_send_event,
+               (info->thd, info->flags, packet, 0, 0)))
+  {
+    info->error= ER_UNKNOWN_ERROR;
+    DBUG_RETURN(-1);
+  }
+
   if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
       net_flush(net))
   {
@@ -825,6 +832,13 @@ static int send_heartbeat_event(binlog_send_info *info,
     DBUG_RETURN(-1);
   }
 
+  if (RUN_HOOK(binlog_transmit, after_send_event,
+               (info->thd, info->flags, packet)))
+  {
+    info->error= ER_UNKNOWN_ERROR;
+    DBUG_RETURN(-1);
+  }
+
   DBUG_RETURN(0);
 }
 
