diff --git a/mysql-test/suite/rpl/r/rpl_checksum.result b/mysql-test/suite/rpl/r/rpl_checksum.result
index d88258f..231afee 100644
--- a/mysql-test/suite/rpl/r/rpl_checksum.result
+++ b/mysql-test/suite/rpl/r/rpl_checksum.result
@@ -71,7 +71,7 @@ insert into t1 values (1) /* will not be applied on slave due to simulation */;
 set @@global.debug_dbug='d,simulate_slave_unaware_checksum';
 start slave;
 include/wait_for_slave_io_error.inc [errno=1236]
-Last_IO_Error = 'Got fatal error 1236 from master when reading data from binary log: 'Slave can not handle replication events with the checksum that master is configured to log; the first event 'master-bin.000009' at 367, the last event read from 'master-bin.000010' at 248, the last byte read from 'master-bin.000010' at 248.''
+Last_IO_Error = 'Got fatal error 1236 from master when reading data from binary log: 'Slave can not handle replication events with the checksum that master is configured to log; the first event 'master-bin.000009' at 367, the last event read from 'master-bin.000010' at 4, the last byte read from 'master-bin.000010' at 248.''
 select count(*) as zero from t1;
 zero
 0
diff --git a/sql/log.cc b/sql/log.cc
index 2c20ea3..edb4c07 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -3133,6 +3133,7 @@ void MYSQL_BIN_LOG::cleanup()
     mysql_mutex_destroy(&LOCK_index);
     mysql_mutex_destroy(&LOCK_xid_list);
     mysql_mutex_destroy(&LOCK_binlog_background_thread);
+    mysql_mutex_destroy(&LOCK_binlog_end_pos);
     mysql_cond_destroy(&update_cond);
     mysql_cond_destroy(&COND_queue_busy);
     mysql_cond_destroy(&COND_xid_list);
@@ -3178,6 +3179,9 @@ void MYSQL_BIN_LOG::init_pthread_objects()
                   &COND_binlog_background_thread, 0);
   mysql_cond_init(key_BINLOG_COND_binlog_background_thread_end,
                   &COND_binlog_background_thread_end, 0);
+
+  mysql_mutex_init(m_key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos,
+                   MY_MUTEX_INIT_SLOW);
 }
 
 
@@ -3524,10 +3528,19 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
     if (flush_io_cache(&log_file) ||
         mysql_file_sync(log_file.file, MYF(MY_WME|MY_SYNC_FILESIZE)))
       goto err;
-    mysql_mutex_lock(&LOCK_commit_ordered);
-    strmake_buf(last_commit_pos_file, log_file_name);
-    last_commit_pos_offset= my_b_tell(&log_file);
-    mysql_mutex_unlock(&LOCK_commit_ordered);
+
+    my_off_t offset= my_b_tell(&log_file);
+
+    if (!is_relay_log)
+    {
+      /* update binlog_end_pos so that it can be read by after sync hook */
+      reset_binlog_end_pos(log_file_name, offset);
+
+      mysql_mutex_lock(&LOCK_commit_ordered);
+      strmake_buf(last_commit_pos_file, log_file_name);
+      last_commit_pos_offset= offset;
+      mysql_mutex_unlock(&LOCK_commit_ordered);
+    }
 
     if (write_file_name_to_index_file)
     {
@@ -3632,6 +3645,7 @@ int MYSQL_BIN_LOG::get_current_log(LOG_INFO* linfo)
 
 int MYSQL_BIN_LOG::raw_get_current_log(LOG_INFO* linfo)
 {
+  mysql_mutex_assert_owner(&LOCK_log);
   strmake_buf(linfo->log_file_name, log_file_name);
   linfo->pos = my_b_tell(&log_file);
   return 0;
@@ -4797,6 +4811,20 @@ void MYSQL_BIN_LOG::make_log_name(char* buf, const char* log_ident)
 
 bool MYSQL_BIN_LOG::is_active(const char *log_file_name_arg)
 {
+  /**
+   * there should/must be mysql_mutex_assert_owner(&LOCK_log) here...
+   * but code violates this! (scary monsters and super creeps!)
+   *
+   * example stacktrace:
+   * #8  MYSQL_BIN_LOG::is_active
+   * #9  MYSQL_BIN_LOG::can_purge_log
+   * #10 MYSQL_BIN_LOG::purge_logs
+   * #11 MYSQL_BIN_LOG::purge_first_log
+   * #12 next_event
+   * #13 exec_relay_log_event
+   *
+   * I didn't investigate if this is ligit...(i.e if my comment is wrong)
+   */
   return !strcmp(log_file_name, log_file_name_arg);
 }
 
@@ -5359,6 +5387,7 @@ binlog_start_consistent_snapshot(handlerton *hton, THD *thd)
   binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data();
 
   /* Server layer calls us with LOCK_commit_ordered locked, so this is safe. */
+  mysql_mutex_assert_owner(&LOCK_commit_ordered);
   strmake_buf(cache_mngr->last_commit_pos_file, mysql_bin_log.last_commit_pos_file);
   cache_mngr->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset;
 
@@ -6013,6 +6042,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
         } 
         else
         {
+          /* update binlog_end_pos so it can be read by dump thread
+           *
+           * note: must be _after_ the RUN_HOOK(after_flush) or else
+           * semi-sync-plugin might not have put the transaction into
+           * it's list before dump-thread tries to send it
+           */
+          update_binlog_end_pos(offset);
+
           signal_update();
           if ((error= rotate(false, &check_purge)))
             check_purge= false;
@@ -6664,6 +6701,9 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
     }
 
     offset= my_b_tell(&log_file);
+
+    update_binlog_end_pos(offset);
+
     /*
       Take mutex to protect against a reader seeing partial writes of 64-bit
       offset on 32-bit CPUs.
@@ -6709,6 +6749,9 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name,
   }
 
   offset= my_b_tell(&log_file);
+
+  update_binlog_end_pos(offset);
+
   /*
     Take mutex to protect against a reader seeing partial writes of 64-bit
     offset on 32-bit CPUs.
@@ -7335,7 +7378,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
       {
         if (!current->error &&
             RUN_HOOK(binlog_storage, after_flush,
-                (current->thd, log_file_name,
+                (current->thd,
+                 current->cache_mngr->last_commit_pos_file,
                  current->cache_mngr->last_commit_pos_offset, synced)))
         {
           current->error= ER_ERROR_ON_WRITE;
@@ -7347,6 +7391,14 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
           all_error= false;
       }
 
+      /* update binlog_end_pos so it can be read by dump thread
+       *
+       * note: must be _after_ the RUN_HOOK(after_flush) or else
+       * semi-sync-plugin might not have put the transaction into
+       * it's list before dump-thread tries to send it
+       */
+      update_binlog_end_pos(commit_offset);
+
       if (any_error)
         sql_print_error("Failed to run 'after_flush' hooks");
       if (!all_error)
@@ -7387,6 +7439,10 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
 
   DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
   mysql_mutex_lock(&LOCK_commit_ordered);
+    /**
+     * TODO(jonaso): Check with Kristian,
+     * if we rotate:d above, this offset is "wrong"
+     */
   last_commit_pos_offset= commit_offset;
   /*
     We cannot unlock LOCK_log until we have locked LOCK_commit_ordered;
@@ -7625,6 +7681,7 @@ void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd)
   PSI_stage_info old_stage;
   DBUG_ENTER("wait_for_update_relay_log");
 
+  mysql_mutex_assert_owner(&LOCK_log);
   thd->ENTER_COND(&update_cond, &LOCK_log,
                   &stage_slave_has_read_all_relay_log,
                   &old_stage);
@@ -7655,6 +7712,7 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
   int ret= 0;
   DBUG_ENTER("wait_for_update_bin_log");
 
+  mysql_mutex_assert_owner(&LOCK_log);
   if (!timeout)
     mysql_cond_wait(&update_cond, &LOCK_log);
   else
@@ -7663,6 +7721,21 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
   DBUG_RETURN(ret);
 }
 
+int MYSQL_BIN_LOG::wait_for_update_binlog_end_pos(THD* thd,
+                                                  struct timespec *timeout)
+{
+  int ret= 0;
+  DBUG_ENTER("wait_for_update_binlog_end_pos");
+
+  mysql_mutex_assert_owner(get_binlog_end_pos_lock());
+  if (!timeout)
+    mysql_cond_wait(&update_cond, get_binlog_end_pos_lock());
+  else
+    ret= mysql_cond_timedwait(&update_cond, get_binlog_end_pos_lock(),
+                              timeout);
+  DBUG_RETURN(ret);
+}
+
 
 /**
   Close the log file.
@@ -9703,6 +9776,14 @@ TC_LOG_BINLOG::set_status_variables(THD *thd)
   }
 }
 
+void assert_LOCK_log_owner(bool owner)
+{
+  if (owner)
+    mysql_mutex_assert_owner(mysql_bin_log.get_log_lock());
+  else
+    mysql_mutex_assert_not_owner(mysql_bin_log.get_log_lock());
+}
+
 struct st_mysql_storage_engine binlog_storage_engine=
 { MYSQL_HANDLERTON_INTERFACE_VERSION };
 
diff --git a/sql/log.h b/sql/log.h
index ac1e9f4..21af7d0 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -341,6 +341,8 @@ class MYSQL_LOG
   /** Instrumentation key to use for file io in @c log_file */
   PSI_file_key m_log_file_key;
 #endif
+  /* for documentation of mutexes held in various places in code */
+  friend void assert_LOCK_log_owner(bool owner);
 };
 
 class MYSQL_QUERY_LOG: public MYSQL_LOG
@@ -425,6 +427,9 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
   PSI_file_key m_key_file_log_index;
 
   PSI_file_key m_key_COND_queue_busy;
+
+  /** The instrumentation key to use for @ LOCK_binlog_end_pos */
+  PSI_mutex_key m_key_LOCK_binlog_end_pos;
 #endif
 
   struct group_commit_entry
@@ -477,6 +482,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
 
   /* LOCK_log and LOCK_index are inited by init_pthread_objects() */
   mysql_mutex_t LOCK_index;
+  mysql_mutex_t LOCK_binlog_end_pos;
   mysql_mutex_t LOCK_xid_list;
   mysql_cond_t  COND_xid_list;
   mysql_cond_t update_cond;
@@ -811,6 +817,67 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
   int bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no);
   bool check_strict_gtid_sequence(uint32 domain_id, uint32 server_id,
                                   uint64 seq_no);
+
+
+  void update_binlog_end_pos(my_off_t pos)
+  {
+    mysql_mutex_assert_owner(&LOCK_log);
+    mysql_mutex_assert_not_owner(&LOCK_binlog_end_pos);
+    lock_binlog_end_pos();
+    /**
+     * note: it would make more sense to assert(pos > binlog_end_pos)
+     * but there are two places triggered by mtr that has pos == binlog_end_pos
+     * i didn't investigate but accepted as it should do no harm
+     */
+    DBUG_ASSERT(pos >= binlog_end_pos);
+    binlog_end_pos= pos;
+    signal_update();
+    unlock_binlog_end_pos();
+  }
+
+  /**
+   * used when opening new file, and binlog_end_pos moves backwards
+   */
+  void reset_binlog_end_pos(const char file_name[FN_REFLEN], my_off_t pos)
+  {
+    mysql_mutex_assert_owner(&LOCK_log);
+    mysql_mutex_assert_not_owner(&LOCK_binlog_end_pos);
+    lock_binlog_end_pos();
+    binlog_end_pos= pos;
+    strcpy(binlog_end_pos_file, file_name);
+    signal_update();
+    unlock_binlog_end_pos();
+  }
+
+  /*
+    It is called by the threads(e.g. dump thread) which want to read
+    log without LOCK_log protection.
+  */
+  my_off_t get_binlog_end_pos(char file_name_buf[FN_REFLEN]) const
+  {
+    mysql_mutex_assert_not_owner(&LOCK_log);
+    mysql_mutex_assert_owner(&LOCK_binlog_end_pos);
+    strcpy(file_name_buf, binlog_end_pos_file);
+    return binlog_end_pos;
+  }
+  void lock_binlog_end_pos() { mysql_mutex_lock(&LOCK_binlog_end_pos); }
+  void unlock_binlog_end_pos() { mysql_mutex_unlock(&LOCK_binlog_end_pos); }
+  mysql_mutex_t* get_binlog_end_pos_lock() { return &LOCK_binlog_end_pos; }
+
+  int wait_for_update_binlog_end_pos(THD* thd, struct timespec * timeout);
+
+  /*
+    Binlog position of end of the binlog.
+    Access to this is protected by LOCK_binlog_end_pos
+
+    The difference between this and last_commit_pos_{file,offset} is that
+    the commit position is updated later. If semi-sync wait point is set
+    to WAIT_AFTER_SYNC, the commit pos is update after semi-sync-ack has
+    been received and the end point is updated after the write as it's needed
+    for the dump threads to be able to semi-sync the event.
+  */
+  my_off_t binlog_end_pos;
+  char binlog_end_pos_file[FN_REFLEN];
 };
 
 class Log_event_handler
@@ -1088,4 +1155,6 @@ static inline TC_LOG *get_tc_log_implementation()
   return &tc_log_mmap;
 }
 
+void assert_LOCK_log_owner(bool owner);
+
 #endif /* LOG_H */
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 6b9e5e4..6dc4a3a 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -5164,9 +5164,18 @@ a file name for --log-bin-index option", opt_binlog_index_name);
     unireg_abort(1);
   }
 
-  if (opt_bin_log && mysql_bin_log.open(opt_bin_logname, LOG_BIN, 0,
-                                        WRITE_CACHE, max_binlog_size, 0, TRUE))
-    unireg_abort(1);
+  if (opt_bin_log)
+  {
+    /**
+     * mutex lock is not needed here.
+     * but to be able to have mysql_mutex_assert_owner() in code,
+     * we do it anyway */
+    mysql_mutex_lock(mysql_bin_log.get_log_lock());
+    if (mysql_bin_log.open(opt_bin_logname, LOG_BIN, 0,
+                           WRITE_CACHE, max_binlog_size, 0, TRUE))
+      unireg_abort(1);
+    mysql_mutex_unlock(mysql_bin_log.get_log_lock());
+  }
 
 #ifdef HAVE_REPLICATION
   if (opt_bin_log && expire_logs_days)
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 3e3adb2..8518747 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -125,8 +125,9 @@ struct binlog_send_info {
   THD *thd;
   NET *net;
   String *packet;
-  char *log_file_name;
+  char *const log_file_name; // ptr/alias to linfo.log_file_name
   slave_connection_state *until_gtid_state;
+  slave_connection_state until_gtid_state_obj;
   Format_description_log_event *fdev;
   int mariadb_slave_capability;
   enum_gtid_skip_type gtid_skip_group;
@@ -138,16 +139,57 @@ struct binlog_send_info {
   bool slave_gtid_ignore_duplicates;
   bool using_gtid_state;
 
-  binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn)
+  int error;
+  const char *errmsg;
+  char error_text[MAX_SLAVE_ERRMSG];
+  rpl_gtid error_gtid;
+
+  ulonglong heartbeat_period;
+
+  /** start file/pos as requested by slave, for error message */
+  char start_log_file_name[FN_REFLEN];
+  my_off_t start_pos;
+
+  /** last pos for error message */
+  my_off_t last_pos;
+
+#ifndef DBUG_OFF
+  int left_events;
+  uint dbug_reconnect_counter;
+  ulong hb_info_counter;
+#endif
+
+  bool clear_initial_log_pos;
+  bool should_stop;
+
+  binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
+                   char *lfn)
     : thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
       log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
       gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
       flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
       slave_gtid_strict_mode(false), send_fake_gtid_list(false),
-      slave_gtid_ignore_duplicates(false)
-  { }
+      slave_gtid_ignore_duplicates(false),
+      error(0),
+      errmsg("Unknown error"),
+      heartbeat_period(0),
+#ifndef DBUG_OFF
+      left_events(max_binlog_dump_events),
+      dbug_reconnect_counter(0),
+      hb_info_counter(0),
+#endif
+      clear_initial_log_pos(false),
+      should_stop(false)
+  {
+    error_text[0] = 0;
+    bzero(&error_gtid, sizeof(error_gtid));
+  }
 };
 
+// prototype
+static int reset_transmit_packet(struct binlog_send_info *info, ushort flags,
+                                 ulong *ev_offset, const char **errmsg);
+
 /*
     fake_rotate_event() builds a fake (=which does not exist physically in any
     binlog) Rotate event, which contains the name of the binlog we are going to
@@ -170,6 +212,7 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position,
                              const char** errmsg, uint8 checksum_alg_arg)
 {
   DBUG_ENTER("fake_rotate_event");
+  ulong ev_offset;
   char buf[ROTATE_HEADER_LEN+100];
   my_bool do_checksum;
   int err;
@@ -178,10 +221,18 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position,
   String *packet= info->packet;
   ha_checksum crc;
 
+  /* reset transmit packet for the fake rotate event below */
+  if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
+    DBUG_RETURN(1);
+
   if ((err= fake_event_header(packet, ROTATE_EVENT,
-                              ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc,
+                              ident_len + ROTATE_HEADER_LEN, &do_checksum,
+                              &crc,
                               errmsg, checksum_alg_arg, 0)))
+  {
+    info->error= ER_UNKNOWN_ERROR;
     DBUG_RETURN(err);
+  }
 
   int8store(buf+R_POS_OFFSET,position);
   packet->append(buf, ROTATE_HEADER_LEN);
@@ -195,8 +246,10 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position,
 
   if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
       (err= fake_event_write(info->net, packet, errmsg)))
+  {
+    info->error= ER_UNKNOWN_ERROR;
     DBUG_RETURN(err);
-
+  }
   DBUG_RETURN(0);
 }
 
@@ -215,13 +268,17 @@ static int fake_gtid_list_event(binlog_send_info *info,
   str.length(0);
   if (glev->to_packet(&str))
   {
+    info->error= ER_UNKNOWN_ERROR;
     *errmsg= "Failed due to out-of-memory writing Gtid_list event";
     return -1;
   }
   if ((err= fake_event_header(packet, GTID_LIST_EVENT,
                               str.length(), &do_checksum, &crc,
                               errmsg, info->current_checksum_alg, current_pos)))
+  {
+    info->error= ER_UNKNOWN_ERROR;
     return err;
+  }
 
   packet->append(str);
   if (do_checksum)
@@ -231,7 +288,10 @@ static int fake_gtid_list_event(binlog_send_info *info,
 
   if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
       (err= fake_event_write(info->net, packet, errmsg)))
+  {
+    info->error= ER_UNKNOWN_ERROR;
     return err;
+  }
 
   return 0;
 }
@@ -243,20 +303,20 @@ static int fake_gtid_list_event(binlog_send_info *info,
   This function allocates header bytes for event transmission, and
   should be called before store the event data to the packet buffer.
 */
-static int reset_transmit_packet(THD *thd, ushort flags,
+static int reset_transmit_packet(binlog_send_info *info, ushort flags,
                                  ulong *ev_offset, const char **errmsg)
 {
   int ret= 0;
-  String *packet= &thd->packet;
+  String *packet= &info->thd->packet;
 
   /* reserve and set default header */
   packet->length(0);
   packet->set("\0", 1, &my_charset_bin);
 
-  if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
+  if (RUN_HOOK(binlog_transmit, reserve_header, (info->thd, flags, packet)))
   {
+    info->error= ER_UNKNOWN_ERROR;
     *errmsg= "Failed to run hook 'reserve_header'";
-    my_errno= ER_UNKNOWN_ERROR;
     ret= 1;
   }
   *ev_offset= packet->length();
@@ -556,36 +616,38 @@ bool purge_master_logs_before_date(THD* thd, time_t purge_time)
                              mysql_bin_log.purge_logs_before_date(purge_time));
 }
 
-int test_for_non_eof_log_read_errors(int error, const char **errmsg)
+void set_read_error(binlog_send_info *info, int error)
 {
   if (error == LOG_READ_EOF)
-    return 0;
-  my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+  {
+    return;
+  }
+  info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
   switch (error) {
   case LOG_READ_BOGUS:
-    *errmsg = "bogus data in log event";
+    info->errmsg= "bogus data in log event";
     break;
   case LOG_READ_TOO_LARGE:
-    *errmsg = "log event entry exceeded max_allowed_packet; \
-Increase max_allowed_packet on master";
+    info->errmsg= "log event entry exceeded max_allowed_packet; "
+        "Increase max_allowed_packet on master";
     break;
   case LOG_READ_IO:
-    *errmsg = "I/O error reading log event";
+    info->errmsg= "I/O error reading log event";
     break;
   case LOG_READ_MEM:
-    *errmsg = "memory allocation failed reading log event";
+    info->errmsg= "memory allocation failed reading log event";
     break;
   case LOG_READ_TRUNC:
-    *errmsg = "binlog truncated in the middle of event; consider out of disk space on master";
+    info->errmsg= "binlog truncated in the middle of event; "
+        "consider out of disk space on master";
     break;
   case LOG_READ_CHECKSUM_FAILURE:
-    *errmsg = "event read from binlog did not pass crc check";
+    info->errmsg= "event read from binlog did not pass crc check";
     break;
   default:
-    *errmsg = "unknown error reading log event on the master";
+    info->errmsg= "unknown error reading log event on the master";
     break;
   }
-  return error;
 }
 
 
@@ -710,11 +772,17 @@ get_slave_until_gtid(THD *thd, String *out_str)
     The  error to send is serious and should force terminating
     the dump thread.
 */
-static int send_heartbeat_event(NET* net, String* packet,
+static int send_heartbeat_event(binlog_send_info *info,
+                                NET* net, String* packet,
                                 const struct event_coordinates *coord,
                                 uint8 checksum_alg_arg)
 {
   DBUG_ENTER("send_heartbeat_event");
+
+  ulong ev_offset;
+  if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
+    DBUG_RETURN(1);
+
   char header[LOG_EVENT_HEADER_LEN];
   my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
     checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
@@ -753,8 +821,10 @@ static int send_heartbeat_event(NET* net, String* packet,
   if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
       net_flush(net))
   {
+    info->error= ER_UNKNOWN_ERROR;
     DBUG_RETURN(-1);
   }
+
   DBUG_RETURN(0);
 }
 
@@ -1552,7 +1622,7 @@ is_until_reached(binlog_send_info *info, ulong *ev_offset,
     Send a last fake Gtid_list_log_event with a flag set to mark that we
     stop due to UNTIL condition.
   */
-  if (reset_transmit_packet(info->thd, info->flags, ev_offset, errmsg))
+  if (reset_transmit_packet(info, info->flags, ev_offset, errmsg))
     return true;
   Gtid_list_log_event glev(&info->until_binlog_state,
                            Gtid_list_log_event::FLAG_UNTIL_REACHED);
@@ -1593,14 +1663,14 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
                                   current_checksum_alg,
                                   &gtid_list, &list_len, info->fdev))
     {
-      my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+      info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
       return "Failed to read Gtid_list_log_event: corrupt binlog";
     }
     err= info->until_binlog_state.load(gtid_list, list_len);
     my_free(gtid_list);
     if (err)
     {
-      my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+      info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
       return "Failed in internal GTID book-keeping: Out of memory";
     }
   }
@@ -1622,7 +1692,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
                                &event_gtid.domain_id, &event_gtid.server_id,
                                &event_gtid.seq_no, &flags2, info->fdev))
       {
-        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+        info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
         return "Failed to read Gtid_log_event: corrupt binlog";
       }
 
@@ -1634,14 +1704,14 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
           {
             DBUG_SET("-d,gtid_force_reconnect_at_10_1_100");
             DBUG_SET_INITIAL("-d,gtid_force_reconnect_at_10_1_100");
-            my_errno= ER_UNKNOWN_ERROR;
+            info->error= ER_UNKNOWN_ERROR;
             return "DBUG-injected forced reconnect";
           }
         });
 
       if (info->until_binlog_state.update_nolock(&event_gtid, false))
       {
-        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+        info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
         return "Failed in internal GTID book-keeping: Out of memory";
       }
 
@@ -1663,7 +1733,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
               const char *errormsg;
               *error_gtid= *gtid;
               give_error_start_pos_missing_in_binlog(&err, &errormsg, error_gtid);
-              my_errno= err;
+              info->error= err;
               return errormsg;
             }
             gtid_entry->flags&= ~(uint32)slave_connection_state::START_ON_EMPTY_DOMAIN;
@@ -1687,7 +1757,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
                 exist, even though both the prior and subsequent seq_no exists
                 for same domain_id and server_id.
               */
-              my_errno= ER_GTID_START_FROM_BINLOG_HOLE;
+              info->error= ER_GTID_START_FROM_BINLOG_HOLE;
               *error_gtid= *gtid;
               return "The binlog on the master is missing the GTID requested "
                 "by the slave (even though both a prior and a subsequent "
@@ -1812,7 +1882,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
       */
       if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
       {
-        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+        info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
         return "Failed to replace row annotate event with dummy: too small event.";
       }
     }
@@ -1834,7 +1904,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
                                                     current_checksum_alg);
     if (err)
     {
-      my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+      info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
       return "Failed to replace GTID event with backwards-compatible event: "
              "currupt event.";
     }
@@ -1865,7 +1935,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
       */
       if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
       {
-        my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+        info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
         return "Failed to replace binlog checkpoint or gtid list event with "
                "dummy: too small event.";
       }
@@ -1893,13 +1963,13 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
   if (RUN_HOOK(binlog_transmit, before_send_event,
                (info->thd, info->flags, packet, info->log_file_name, pos)))
   {
-    my_errno= ER_UNKNOWN_ERROR;
+    info->error= ER_UNKNOWN_ERROR;
     return "run 'before_send_event' hook failed";
   }
 
   if (my_net_write(info->net, (uchar*) packet->ptr(), len))
   {
-    my_errno= ER_UNKNOWN_ERROR;
+    info->error= ER_UNKNOWN_ERROR;
     return "Failed on my_net_write()";
   }
 
@@ -1908,7 +1978,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
   {
     if (send_file(info->thd))
     {
-      my_errno= ER_UNKNOWN_ERROR;
+      info->error= ER_UNKNOWN_ERROR;
       return "failed in send_file()";
     }
   }
@@ -1916,83 +1986,91 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
   if (RUN_HOOK(binlog_transmit, after_send_event,
                (info->thd, info->flags, packet)))
   {
-    my_errno= ER_UNKNOWN_ERROR;
+    info->error= ER_UNKNOWN_ERROR;
     return "Failed to run hook 'after_send_event'";
   }
 
   return NULL;    /* Success */
 }
 
-
-void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
-		       ushort flags)
+static int check_start_offset(binlog_send_info *info,
+                              const char *log_file_name,
+                              my_off_t pos)
 {
-  LOG_INFO linfo;
-  char *log_file_name = linfo.log_file_name;
-  char search_file_name[FN_REFLEN], *name;
+  IO_CACHE log;
+  File file= -1;
 
-  ulong ev_offset;
+  /** check that requested position is inside of file */
+  if ((file=open_binlog(&log, log_file_name, &info->errmsg)) < 0)
+  {
+    info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+    return 1;
+  }
 
-  IO_CACHE log;
-  File file = -1;
-  String* const packet= &thd->packet;
+  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
+  {
+    const char* msg= "Client requested master to start replication from "
+        "impossible position";
+
+    info->errmsg= NULL; // don't do further modifications of error_text
+    snprintf(info->error_text, sizeof(info->error_text),
+             "%s; the first event '%s' at %lld, "
+             "the last event read from '%s' at %d, "
+             "the last byte read from '%s' at %d.",
+             msg,
+             my_basename(info->start_log_file_name), pos,
+             my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE,
+             my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE);
+    info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+    goto err;
+  }
+
+err:
+  end_io_cache(&log);
+  mysql_file_close(file, MYF(MY_WME));
+  return info->error;
+}
+
+static int init_binlog_sender(binlog_send_info *info,
+                              LOG_INFO *linfo,
+                              const char *log_ident,
+                              my_off_t *pos)
+{
+  THD *thd= info->thd;
   int error;
-  const char *errmsg = "Unknown error", *tmp_msg;
-  char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message()
-  mysql_mutex_t *log_lock;
-  mysql_cond_t *log_cond;
   char str_buf[128];
   String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
   char str_buf2[128];
   String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
-  slave_connection_state until_gtid_state_obj;
-  rpl_gtid error_gtid;
-  binlog_send_info info(thd, packet, flags, log_file_name);
+  connect_gtid_state.length(0);
 
-  int old_max_allowed_packet= thd->variables.max_allowed_packet;
+  /** save start file/pos that was requested by slave */
+  strmake(info->start_log_file_name, log_ident,
+          sizeof(info->start_log_file_name));
+  info->start_pos= *pos;
 
-#ifndef DBUG_OFF
-  int left_events = max_binlog_dump_events;
-  uint dbug_reconnect_counter= 0;
-#endif
-  DBUG_ENTER("mysql_binlog_send");
-  DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));
+  /** init last pos */
+  info->last_pos= *pos;
 
-  bzero((char*) &log,sizeof(log));
-  bzero(&error_gtid, sizeof(error_gtid));
-  /* 
-     heartbeat_period from @master_heartbeat_period user variable
-  */
-  ulonglong heartbeat_period= get_heartbeat_period(thd);
-  struct timespec heartbeat_buf;
-  struct timespec *heartbeat_ts= NULL;
-  const LOG_POS_COORD start_coord= { log_ident, pos },
-    *p_start_coord= &start_coord;
-  LOG_POS_COORD coord_buf= { log_file_name, BIN_LOG_HEADER_SIZE },
-    *p_coord= &coord_buf;
-  if (heartbeat_period != 0)
-  {
-    heartbeat_ts= &heartbeat_buf;
-    set_timespec_nsec(*heartbeat_ts, 0);
-  }
-  info.mariadb_slave_capability= get_mariadb_slave_capability(thd);
+  info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd);
+  info->mariadb_slave_capability= get_mariadb_slave_capability(thd);
+  info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
+  DBUG_EXECUTE_IF("simulate_non_gtid_aware_master",
+                  info->using_gtid_state= false;);
 
-  connect_gtid_state.length(0);
-  info.using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
-  DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", info.using_gtid_state= false;);
-  if (info.using_gtid_state)
+  if (info->using_gtid_state)
   {
-    info.slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
-    info.slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd);
-    if(get_slave_until_gtid(thd, &slave_until_gtid_str))
-      info.until_gtid_state= &until_gtid_state_obj;
+    info->slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
+    info->slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd);
+    if (get_slave_until_gtid(thd, &slave_until_gtid_str))
+      info->until_gtid_state= &info->until_gtid_state_obj;
   }
 
   DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
     {
       DBUG_SET("-d,binlog_force_reconnect_after_22_events");
       DBUG_SET_INITIAL("-d,binlog_force_reconnect_after_22_events");
-      dbug_reconnect_counter= 22;
+      info->dbug_reconnect_counter= 22;
     });
 
   /*
@@ -2008,764 +2086,829 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
                   });
 
   if (global_system_variables.log_warnings > 1)
-    sql_print_information("Start binlog_dump to slave_server(%lu), pos(%s, %lu)",
-                          thd->variables.server_id, log_ident, (ulong)pos);
-  if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
-  {
-    errmsg= "Failed to run hook 'transmit_start'";
-    my_errno= ER_UNKNOWN_ERROR;
-    goto err;
-  }
+    sql_print_information(
+        "Start binlog_dump to slave_server(%lu), pos(%s, %lu)",
+        thd->variables.server_id, log_ident, (ulong)*pos);
 
 #ifndef DBUG_OFF
   if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
   {
-    errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover";
-    my_errno= ER_UNKNOWN_ERROR;
-    goto err;
+    info->errmsg= "Master failed COM_BINLOG_DUMP to test if slave can recover";
+    info->error= ER_UNKNOWN_ERROR;
+    return 1;
   }
 #endif
 
-  if (!(info.fdev= new Format_description_log_event(3)))
-  {
-    errmsg= "Out of memory initializing format_description event";
-    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-    goto err;
-  }
-
   if (!mysql_bin_log.is_open())
   {
-    errmsg = "Binary log is not open";
-    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-    goto err;
+    info->errmsg= "Binary log is not open";
+    info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+    return 1;
   }
   if (!server_id_supplied)
   {
-    errmsg = "Misconfigured master - server id was not set";
-    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-    goto err;
+    info->errmsg= "Misconfigured master - server id was not set";
+    info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+    return 1;
   }
 
-  name=search_file_name;
-  if (info.using_gtid_state)
+  char search_file_name[FN_REFLEN];
+  const char *name=search_file_name;
+  if (info->using_gtid_state)
   {
-    if (info.gtid_state.load(connect_gtid_state.c_ptr_quick(),
+    if (info->gtid_state.load(connect_gtid_state.c_ptr_quick(),
                              connect_gtid_state.length()))
     {
-      errmsg= "Out of memory or malformed slave request when obtaining start "
-        "position from GTID state";
-      my_errno= ER_UNKNOWN_ERROR;
-      goto err;
+      info->errmsg= "Out of memory or malformed slave request when obtaining "
+          "start position from GTID state";
+      info->error= ER_UNKNOWN_ERROR;
+      return 1;
     }
-    if (info.until_gtid_state &&
-        info.until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
+    if (info->until_gtid_state &&
+        info->until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
                                     slave_until_gtid_str.length()))
     {
-      errmsg= "Out of memory or malformed slave request when obtaining UNTIL "
-        "position sent from slave";
-      my_errno= ER_UNKNOWN_ERROR;
-      goto err;
+      info->errmsg= "Out of memory or malformed slave request when "
+          "obtaining UNTIL position sent from slave";
+      info->error= ER_UNKNOWN_ERROR;
+      return 1;
     }
-    if ((error= check_slave_start_position(&info, &errmsg, &error_gtid)))
+    if ((error= check_slave_start_position(info, &info->errmsg,
+                                           &info->error_gtid)))
     {
-      my_errno= error;
-      goto err;
+      info->error= error;
+      return 1;
     }
-    if ((errmsg= gtid_find_binlog_file(&info.gtid_state, search_file_name,
-                                       info.until_gtid_state)))
+    if ((info->errmsg= gtid_find_binlog_file(&info->gtid_state,
+                                             search_file_name,
+                                             info->until_gtid_state)))
     {
-      my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-      goto err;
+      info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+      return 1;
     }
-    pos= 4;
+
+    /* start from beginning of binlog file */
+    *pos = 4;
   }
   else
   {
     if (log_ident[0])
       mysql_bin_log.make_log_name(search_file_name, log_ident);
     else
-      name=0;					// Find first log
+      name=0; // Find first log
   }
+  linfo->index_file_offset= 0;
 
-  linfo.index_file_offset = 0;
-
-  if (mysql_bin_log.find_log_pos(&linfo, name, 1))
+  if (mysql_bin_log.find_log_pos(linfo, name, 1))
   {
-    errmsg = "Could not find first log file name in binary log index file";
-    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-    goto err;
+    info->errmsg= "Could not find first log file name in binary "
+        "log index file";
+    info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+    return 1;
   }
 
+  // set current pos too
+  linfo->pos= *pos;
+
+  // note: publish that we use file, before we open it
   mysql_mutex_lock(&LOCK_thread_count);
-  thd->current_linfo = &linfo;
+  thd->current_linfo= linfo;
   mysql_mutex_unlock(&LOCK_thread_count);
 
-  if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
+  if (check_start_offset(info, linfo->log_file_name, *pos))
+    return 1;
+
+  if (*pos > BIN_LOG_HEADER_SIZE)
   {
-    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-    goto err;
+    /*
+      mark that first format descriptor with "log_pos=0", so the slave
+      should not increment master's binlog position
+      (rli->group_master_log_pos)
+    */
+    info->clear_initial_log_pos= true;
   }
-  if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
+
+  return 0;
+}
+
+/**
+ * send format descriptor event for one binlog file
+ */
+static int send_format_descriptor_event(binlog_send_info *info,
+                                        IO_CACHE *log,
+                                        LOG_INFO *linfo,
+                                        my_off_t start_pos)
+{
+  int error;
+  ulong ev_offset;
+  THD *thd= info->thd;
+  String *packet= info->packet;
+  Log_event_type event_type;
+
+  /**
+   * 1) reset fdev before each log-file
+   * 2) read first event, should be the format descriptor
+   * 3) read second event, *might* be start encryption event
+   *    if it's isn't, seek back to undo this read
+   */
+  if (info->fdev != NULL)
+    delete info->fdev;
+
+  if (!(info->fdev= new Format_description_log_event(3)))
   {
-    errmsg= "Client requested master to start replication from \
-impossible position";
-    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-    goto err;
+    info->errmsg= "Out of memory initializing format_description event";
+    info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+    return 1;
   }
 
-  /* reset transmit packet for the fake rotate event below */
-  if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
-    goto err;
-
-  /*
-    Tell the client about the log name with a fake Rotate event;
-    this is needed even if we also send a Format_description_log_event
-    just after, because that event does not contain the binlog's name.
-    Note that as this Rotate event is sent before
-    Format_description_log_event, the slave cannot have any info to
-    understand this event's format, so the header len of
-    Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
-    than other events except FORMAT_DESCRIPTION_EVENT).
-    Before 4.0.14 we called fake_rotate_event below only if (pos ==
-    BIN_LOG_HEADER_SIZE), because if this is false then the slave
-    already knows the binlog's name.
-    Since, we always call fake_rotate_event; if the slave already knew
-    the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
-    useless but does not harm much. It is nice for 3.23 (>=.58) slaves
-    which test Rotate events to see if the master is 4.0 (then they
-    choose to stop because they can't replicate 4.0); by always calling
-    fake_rotate_event we are sure that 3.23.58 and newer will detect the
-    problem as soon as replication starts (BUG#198).
-    Always calling fake_rotate_event makes sending of normal
-    (=from-binlog) Rotate events a priori unneeded, but it is not so
-    simple: the 2 Rotate events are not equivalent, the normal one is
-    before the Stop event, the fake one is after. If we don't send the
-    normal one, then the Stop event will be interpreted (by existing 4.0
-    slaves) as "the master stopped", which is wrong. So for safety,
-    given that we want minimum modification of 4.0, we send the normal
-    and fake Rotates.
-  */
-  if (fake_rotate_event(&info, pos, &errmsg,
-                        get_binlog_checksum_value_at_connect(thd)))
+  do
   {
+    /* reset transmit packet for the event read from binary log file */
+    if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
+      break;
+
     /*
-       This error code is not perfect, as fake_rotate_event() does not
-       read anything from the binlog; if it fails it's because of an
-       error in my_net_write(), fortunately it will say so in errmsg.
+      Try to find a Format_description_log_event at the beginning of
+      the binlog
     */
-    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-    goto err;
+    info->last_pos= my_b_tell(log);
+    error= Log_event::read_log_event(log, packet, /* LOCK_log */ NULL,
+                                     info->current_checksum_alg);
+    linfo->pos= my_b_tell(log);
+
+    if (error)
+    {
+      set_read_error(info, error);
+      break;
+    }
+
+    event_type= (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
+
+    /*
+      The packet has offsets equal to the normal offsets in a
+      binlog event + ev_offset (the first ev_offset characters are
+      the header (default \0)).
+    */
+    DBUG_PRINT("info",
+               ("Looked for a Format_description_log_event, "
+                "found event type %d", (int)event_type));
+
+    if (event_type != FORMAT_DESCRIPTION_EVENT)
+    {
+      info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+      info->errmsg= "Failed to find format descriptor event in start of binlog";
+      sql_print_warning("Failed to find format descriptor event in "
+                        "start of binlog: %s",
+                        info->log_file_name);
+      break;
+    }
+
+    info->current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
+                                                 packet->length() - ev_offset);
+
+    DBUG_ASSERT(info->current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+                info->current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+                info->current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+
+    if (!is_slave_checksum_aware(thd) &&
+        info->current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+        info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+    {
+      info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+      info->errmsg= "Slave can not handle replication events with the "
+          "checksum that master is configured to log";
+      sql_print_warning("Master is configured to log replication events "
+                        "with checksum, but will not send such events to "
+                        "slaves that cannot process them");
+      break;
+    }
+
+    Format_description_log_event *tmp;
+    if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
+                                                packet->length()-ev_offset,
+                                                info->fdev)))
+    {
+      info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+      info->errmsg= "Corrupt Format_description event found "
+          "or out-of-memory";
+      break;
+    }
+    delete info->fdev;
+    info->fdev= tmp;
+
+    (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+
+    if (info->clear_initial_log_pos)
+    {
+      info->clear_initial_log_pos= false;
+      /*
+        mark that this event with "log_pos=0", so the slave
+        should not increment master's binlog position
+        (rli->group_master_log_pos)
+      */
+      int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, (ulong) 0);
+
+      /*
+        if reconnect master sends FD event with `created' as 0
+        to avoid destroying temp tables.
+      */
+      int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
+                ST_CREATED_OFFSET+ev_offset, (ulong) 0);
+
+      /* fix the checksum due to latest changes in header */
+      if (info->current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+        info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+        fix_checksum(packet, ev_offset);
+    }
+
+    /* send it */
+    if (my_net_write(info->net, (uchar*) packet->ptr(), packet->length()))
+    {
+      info->errmsg= "Failed on my_net_write()";
+      info->error= ER_UNKNOWN_ERROR;
+      break;
+    }
+
+    /** all done */
+    return 0;
+
+  } while (false);
+
+  return 1;
+}
+
+static bool should_stop(binlog_send_info *info)
+{
+  return
+      info->net->error ||
+      info->net->vio == NULL ||
+      info->thd->killed ||
+      info->error != 0 ||
+      info->should_stop;
+}
+
+/**
+ * wait for new events to enter binlog
+ * this function will send heartbeats while waiting if so configured
+ */
+static int wait_new_events(binlog_send_info *info,         /* in */
+                           LOG_INFO* linfo,                /* in */
+                           char binlog_end_pos_filename[], /* out */
+                           my_off_t *end_pos_ptr)          /* out */
+{
+  int ret= 1;
+  PSI_stage_info old_stage;
+
+  mysql_bin_log.lock_binlog_end_pos();
+  info->thd->ENTER_COND(mysql_bin_log.get_log_cond(),
+                        mysql_bin_log.get_binlog_end_pos_lock(),
+                        &stage_master_has_sent_all_binlog_to_slave,
+                        &old_stage);
+
+  while (!should_stop(info))
+  {
+    *end_pos_ptr= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename);
+    if (strcmp(linfo->log_file_name, binlog_end_pos_filename) != 0)
+    {
+      /* there has been a log file switch, we don't need to wait */
+      ret= 0;
+      break;
+    }
+
+    if (linfo->pos < *end_pos_ptr)
+    {
+      /* there is data to read, we don't need to wait */
+      ret= 0;
+      break;
+    }
+
+    if (info->heartbeat_period)
+    {
+      struct timespec ts;
+      set_timespec_nsec(ts, info->heartbeat_period);
+      ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, &ts);
+      if (ret == ETIMEDOUT || ret == ETIME)
+      {
+        struct event_coordinates coord = { linfo->log_file_name, linfo->pos };
+#ifndef DBUG_OFF
+        const int hb_info_counter_limit = 3;
+        if (info->hb_info_counter < hb_info_counter_limit)
+        {
+          sql_print_information("master sends heartbeat message %s:%llu",
+                                linfo->log_file_name, linfo->pos);
+          info->hb_info_counter++;
+          if (info->hb_info_counter == hb_info_counter_limit)
+            sql_print_information("the rest of heartbeat info skipped ...");
+        }
+#endif
+        if (send_heartbeat_event(info,
+                                 info->net, info->packet, &coord,
+                                 info->current_checksum_alg))
+        {
+          ret= 1; // error
+          break;
+        }
+        /**
+         * re-read heartbeat period after each sent
+         */
+        info->heartbeat_period= get_heartbeat_period(info->thd);
+      }
+      else if (ret != 0)
+      {
+        ret= 1; // error
+        break;
+      }
+    }
+    else
+    {
+      ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, NULL);
+      if (ret != 0 && ret != ETIMEDOUT && ret != ETIME)
+      {
+        ret= 1; // error
+        break;
+      }
+    }
   }
 
-  /*
-    Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
-    this larger than the corresponding packet (query) sent 
-    from client to master.
-  */
-  thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
+  /* it releases the lock set in ENTER_COND */
+  info->thd->EXIT_COND(&old_stage);
+  return ret;
+}
 
-  /*
-    We can set log_lock now, it does not move (it's a member of
-    mysql_bin_log, and it's already inited, and it will be destroyed
-    only at shutdown).
-  */
-  p_coord->pos= pos; // the first hb matches the slave's last seen value
-  log_lock= mysql_bin_log.get_log_lock();
-  log_cond= mysql_bin_log.get_log_cond();
-  if (pos > BIN_LOG_HEADER_SIZE)
+/**
+ * get end pos of current log file, this function
+ * will wait if there is nothing available
+ */
+static my_off_t get_binlog_end_pos(binlog_send_info *info,
+                                   IO_CACHE* log,
+                                   LOG_INFO* linfo)
+{
+  my_off_t log_pos= my_b_tell(log);
+
+  /**
+   * get current binlog end pos
+   */
+  mysql_bin_log.lock_binlog_end_pos();
+  char binlog_end_pos_filename[FN_REFLEN];
+  my_off_t end_pos= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename);
+  mysql_bin_log.unlock_binlog_end_pos();
+
+  do
   {
-    /* reset transmit packet for the event read from binary log
-       file */
-    if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
-      goto err;
+    if (strcmp(binlog_end_pos_filename, linfo->log_file_name) != 0)
+    {
+      /**
+       * this file is not active, since it's not written to again,
+       * it safe to check file length and use that as end_pos
+       */
+      end_pos= my_b_filelength(log);
 
-     /*
-       Try to find a Format_description_log_event at the beginning of
-       the binlog
-     */
-    if (!(error = Log_event::read_log_event(&log, packet, log_lock, 0)))
-    { 
-       /*
-         The packet has offsets equal to the normal offsets in a
-         binlog event + ev_offset (the first ev_offset characters are
-         the header (default \0)).
+      if (log_pos == end_pos)
+        return 0;        // already at end of file inactive file
+      else
+        return end_pos;  // return size of inactive file
+    }
+    else
+    {
+      /**
+       * this is the active file
        */
-       DBUG_PRINT("info",
-                  ("Looked for a Format_description_log_event, found event type %d",
-                   (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
-       if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
-       {
-         Format_description_log_event *tmp;
-
-         info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
-                                                     packet->length() - ev_offset);
-         DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
-                     info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
-                     info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
-         if (!is_slave_checksum_aware(thd) &&
-             info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
-             info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
-         {
-           my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-           errmsg= "Slave can not handle replication events with the checksum "
-             "that master is configured to log";
-           sql_print_warning("Master is configured to log replication events "
-                             "with checksum, but will not send such events to "
-                             "slaves that cannot process them");
-           goto err;
-         }
-
-         if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
-                                                     packet->length()-ev_offset,
-                                                     info.fdev)))
-         {
-           my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-           errmsg= "Corrupt Format_description event found or out-of-memory";
-           goto err;
-         }
-         delete info.fdev;
-         info.fdev= tmp;
-
-         (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
-         /*
-           mark that this event with "log_pos=0", so the slave
-           should not increment master's binlog position
-           (rli->group_master_log_pos)
-         */
-         int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
-         /*
-           if reconnect master sends FD event with `created' as 0
-           to avoid destroying temp tables.
-          */
-         int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
-                   ST_CREATED_OFFSET+ev_offset, (ulong) 0);
-
-	 /* fix the checksum due to latest changes in header */
-	 if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
-             info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
-           fix_checksum(packet, ev_offset);
-
-         /* send it */
-         if (my_net_write(info.net, (uchar*) packet->ptr(), packet->length()))
-         {
-           errmsg = "Failed on my_net_write()";
-           my_errno= ER_UNKNOWN_ERROR;
-           goto err;
-         }
-
-         /*
-           No need to save this event. We are only doing simple reads
-           (no real parsing of the events) so we don't need it. And so
-           we don't need the artificial Format_description_log_event of
-           3.23&4.x.
+
+      if (log_pos < end_pos)
+      {
+        /**
+         * there is data available to read
          */
-       }
-     }
-     else
-     {
-       if (test_for_non_eof_log_read_errors(error, &errmsg))
-         goto err;
-       /*
-         It's EOF, nothing to do, go on reading next events, the
-         Format_description_log_event will be found naturally if it is written.
+        return end_pos;
+      }
+
+      /**
+       * check if we should wait for more data
        */
-     }
-  } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
-  else
-  {
-    /* The Format_description_log_event event will be found naturally. */
-  }
+      if ((info->flags & BINLOG_DUMP_NON_BLOCK) ||
+          (info->thd->variables.server_id == 0))
+      {
+        info->should_stop= true;
+        return 0;
+      }
 
-  /*
-    Handle the case of START SLAVE UNTIL with an UNTIL condition already
-    fulfilled at the start position.
+      /**
+       * flush data before waiting
+       */
+      if (net_flush(info->net))
+      {
+        info->errmsg= "failed on net_flush()";
+        info->error= ER_UNKNOWN_ERROR;
+        return 1;
+      }
 
-    We will send one event, the format_description, and then stop.
-  */
-  if (info.until_gtid_state && info.until_gtid_state->count() == 0)
-    info.gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
+      if (wait_new_events(info, linfo, binlog_end_pos_filename, &end_pos))
+        return 1;
+    }
+  } while (!should_stop(info));
 
-  /* seek to the requested position, to start the requested dump */
-  my_b_seek(&log, pos);			// Seek will done on next read
+  return 0;
+}
+
+/**
+ * This function sends events from one binlog file
+ * but only up until end_pos
+ *
+ * return 0 - OK
+ *        else NOK
+ */
+static int send_events(binlog_send_info *info,
+                       IO_CACHE* log,
+                       LOG_INFO* linfo,
+                       my_off_t end_pos)
+{
+  int error;
+  ulong ev_offset;
+
+  String *packet= info->packet;
+  linfo->pos= my_b_tell(log);
+  info->last_pos= my_b_tell(log);
 
-  while (!info.net->error && info.net->vio != 0 && !thd->killed)
+  while (linfo->pos < end_pos)
   {
-    Log_event_type event_type= UNKNOWN_EVENT;
-    killed_state killed;
+    if (should_stop(info))
+      return 0;
 
     /* reset the transmit packet for the event read from binary log
        file */
-    if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
-      goto err;
+    if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
+      return 1;
+
+    info->last_pos= linfo->pos;
+    error = Log_event::read_log_event(log, packet, /* LOCK_log */ NULL,
+                                      info->current_checksum_alg,
+                                      NULL, NULL);
+    linfo->pos= my_b_tell(log);
 
-    bool is_active_binlog= false;
-    while (!(killed= thd->killed) &&
-           !(error = Log_event::read_log_event(&log, packet, log_lock,
-                                              info.current_checksum_alg,
-                                              log_file_name,
-                                              &is_active_binlog)))
+    if (error)
     {
+      goto read_err;
+    }
+
+    Log_event_type event_type=
+        (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
+
 #ifndef DBUG_OFF
-      if (max_binlog_dump_events && !left_events--)
+    if (info->dbug_reconnect_counter > 0)
+    {
+      --info->dbug_reconnect_counter;
+      if (info->dbug_reconnect_counter == 0)
       {
-	net_flush(info.net);
-	errmsg = "Debugging binlog dump abort";
-	my_errno= ER_UNKNOWN_ERROR;
-	goto err;
+        info->errmsg= "DBUG-injected forced reconnect";
+        info->error= ER_UNKNOWN_ERROR;
+        return 1;
       }
+    }
 #endif
-      /*
-        log's filename does not change while it's active
-      */
-      p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
 
-      event_type=
-        (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
 #ifdef ENABLED_DEBUG_SYNC
-      DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
+    DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
+                    {
+                      if (event_type == XID_EVENT)
                       {
-                        if (event_type == XID_EVENT)
-                        {
-                          net_flush(info.net);
-                          const char act[]=
+                        net_flush(info->net);
+                        const char act[]=
                             "now "
                             "wait_for signal.continue";
-                          DBUG_ASSERT(debug_sync_service);
-                          DBUG_ASSERT(!debug_sync_set_action(thd,
-                                                             STRING_WITH_LEN(act)));
-                          const char act2[]=
+                        DBUG_ASSERT(debug_sync_service);
+                        DBUG_ASSERT(!debug_sync_set_action(
+                            info->thd,
+                            STRING_WITH_LEN(act)));
+
+                        const char act2[]=
                             "now "
                             "signal signal.continued";
-                          DBUG_ASSERT(!debug_sync_set_action(current_thd,
-                                                             STRING_WITH_LEN(act2)));
-                        }
-                      });
+                        DBUG_ASSERT(!debug_sync_set_action(
+                            info->thd,
+                            STRING_WITH_LEN(act2)));
+                      }
+                    });
 #endif
-      if (event_type == FORMAT_DESCRIPTION_EVENT)
-      {
-        Format_description_log_event *tmp;
-
-        info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
-                                               packet->length() - ev_offset);
-        DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
-                    info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
-                    info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
-        if (!is_slave_checksum_aware(thd) &&
-            info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
-            info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
-        {
-          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-          errmsg= "Slave can not handle replication events with the checksum "
-            "that master is configured to log";
-          sql_print_warning("Master is configured to log replication events "
-                            "with checksum, but will not send such events to "
-                            "slaves that cannot process them");
-          goto err;
-        }
 
-        if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
-                                                    packet->length()-ev_offset,
-                                                    info.fdev)))
-        {
-          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-          errmsg= "Corrupt Format_description event found or out-of-memory";
-          goto err;
-        }
-        delete info.fdev;
-        info.fdev= tmp;
+    if ((info->errmsg= send_event_to_slave(info, event_type, log,
+                                           ev_offset, &info->error_gtid)))
+      return 1;
 
-        (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
-      }
+    if (unlikely(info->send_fake_gtid_list) &&
+        info->gtid_skip_group == GTID_SKIP_NOT)
+    {
+      Gtid_list_log_event glev(&info->until_binlog_state, 0);
 
-#ifndef DBUG_OFF
-      if (dbug_reconnect_counter > 0)
+      if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg) ||
+          fake_gtid_list_event(info, &glev, &info->errmsg, my_b_tell(log)))
       {
-        --dbug_reconnect_counter;
-        if (dbug_reconnect_counter == 0)
-        {
-          errmsg= "DBUG-injected forced reconnect";
-          my_errno= ER_UNKNOWN_ERROR;
-          goto err;
-        }
+        info->error= ER_UNKNOWN_ERROR;
+        return 1;
       }
-#endif
+      info->send_fake_gtid_list= false;
+    }
 
-      if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
-                                        ev_offset, &error_gtid)))
+    if (info->until_gtid_state &&
+        is_until_reached(info, &ev_offset, event_type, &info->errmsg,
+                         my_b_tell(log)))
+    {
+      if (info->errmsg)
       {
-        errmsg= tmp_msg;
-        goto err;
+        info->error= ER_UNKNOWN_ERROR;
+        return 1;
       }
-      if (unlikely(info.send_fake_gtid_list) &&
-          info.gtid_skip_group == GTID_SKIP_NOT)
-      {
-        Gtid_list_log_event glev(&info.until_binlog_state, 0);
+      info->should_stop= true;
+      return 0;
+    }
+  }
 
-        if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
-            fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
-        {
-          my_errno= ER_UNKNOWN_ERROR;
-          goto err;
-        }
-        info.send_fake_gtid_list= false;
-      }
-      if (info.until_gtid_state &&
-          is_until_reached(&info, &ev_offset, event_type, &errmsg,
-                           my_b_tell(&log)))
-      {
-        if (errmsg)
-        {
-          my_errno= ER_UNKNOWN_ERROR;
-          goto err;
-        }
-        goto end;
-      }
+  return 0;
 
-      DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
-                      {
-                        if (event_type == XID_EVENT)
-                        {
-                          net_flush(info.net);
-                        }
-                      });
-
-      /* reset transmit packet for next loop */
-      if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
-        goto err;
-    }
-    if (killed)
-      goto end;
+read_err:
+  set_read_error(info, error);
 
-    DBUG_EXECUTE_IF("wait_after_binlog_EOF",
-                    {
-                      const char act[]= "now wait_for signal.rotate_finished";
-                      DBUG_ASSERT(!debug_sync_set_action(current_thd,
-                                                         STRING_WITH_LEN(act)));
-                    };);
+  return 1;
+}
 
-    /*
-      TODO: now that we are logging the offset, check to make sure
-      the recorded offset and the actual match.
-      Guilhem 2003-06: this is not true if this master is a slave
-      <4.0.15 running with --log-slave-updates, because then log_pos may
-      be the offset in the-master-of-this-master's binlog.
-    */
-    if (test_for_non_eof_log_read_errors(error, &errmsg))
-      goto err;
+/**
+ * This function sends one binlog file to slave
+ *
+ * return 0 - OK
+ *        1 - NOK
+ */
+static int send_one_binlog_file(binlog_send_info *info,
+                                IO_CACHE* log,
+                                LOG_INFO* linfo,
+                                my_off_t start_pos)
+{
+  assert_LOCK_log_owner(false); // we don't have LOCK_log
 
-    /*
-      We should only move to the next binlog when the last read event
-      came from a already deactivated binlog.
+  /* seek to the requested position, to start the requested dump */
+  if (start_pos != BIN_LOG_HEADER_SIZE)
+  {
+    my_b_seek(log, start_pos);
+    linfo->pos= start_pos;
+  }
+
+  while (!should_stop(info))
+  {
+    /**
+     * get end pos of current log file, this function
+     * will wait if there is nothing available
      */
-    if (!(flags & BINLOG_DUMP_NON_BLOCK) && is_active_binlog)
+    my_off_t end_pos= get_binlog_end_pos(info, log, linfo);
+    if (end_pos <= 1)
     {
-      /*
-	Block until there is more data in the log
-      */
-      if (net_flush(info.net))
-      {
-	errmsg = "failed on net_flush()";
-	my_errno= ER_UNKNOWN_ERROR;
-	goto err;
-      }
+      /** end of file or error */
+      return end_pos;
+    }
 
-      /*
-	We may have missed the update broadcast from the log
-	that has just happened, let's try to catch it if it did.
-	If we did not miss anything, we just wait for other threads
-	to signal us.
-      */
-      {
-	log.error=0;
-	bool read_packet = 0;
+    /**
+     * send events from current position up to end_pos
+     */
+    if (send_events(info, log, linfo, end_pos))
+      return 1;
+  }
 
-#ifndef DBUG_OFF
-	if (max_binlog_dump_events && !left_events--)
-	{
-	  errmsg = "Debugging binlog dump abort";
-	  my_errno= ER_UNKNOWN_ERROR;
-	  goto err;
-	}
-#endif
+  return 1;
+}
 
-        /* reset the transmit packet for the event read from binary log
-           file */
-        if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
-          goto err;
-        
-	/*
-	  No one will update the log while we are reading
-	  now, but we'll be quick and just read one record
-
-	  TODO:
-          Add an counter that is incremented for each time we update the
-          binary log.  We can avoid the following read if the counter
-          has not been updated since last read.
-	*/
-
-        mysql_mutex_lock(log_lock);
-        switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0,
-                                                 info.current_checksum_alg)) {
-	case 0:
-	  /* we read successfully, so we'll need to send it to the slave */
-          mysql_mutex_unlock(log_lock);
-	  read_packet = 1;
-          p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
-          event_type=
-            (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
-	  break;
-
-	case LOG_READ_EOF:
-        {
-          int ret;
-          ulong signal_cnt;
-	  DBUG_PRINT("wait",("waiting for data in binary log"));
-          /* For mysqlbinlog (mysqlbinlog.server_id==0). */
-	  if (thd->variables.server_id==0)
-	  {
-            mysql_mutex_unlock(log_lock);
-	    goto end;
-	  }
+void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
+                       ushort flags)
+{
+  LOG_INFO linfo;
 
-#ifndef DBUG_OFF
-          ulong hb_info_counter= 0;
-#endif
-          PSI_stage_info old_stage;
-          signal_cnt= mysql_bin_log.signal_cnt;
-          do 
-          {
-            if (heartbeat_period != 0)
-            {
-              DBUG_ASSERT(heartbeat_ts);
-              set_timespec_nsec(*heartbeat_ts, heartbeat_period);
-            }
-            thd->ENTER_COND(log_cond, log_lock,
-                            &stage_master_has_sent_all_binlog_to_slave,
-                            &old_stage);
-            if (thd->killed)
-              break;
-            ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
-            DBUG_ASSERT(ret == 0 || (heartbeat_period != 0));
-            if (ret == ETIMEDOUT || ret == ETIME)
-            {
-#ifndef DBUG_OFF
-              if (hb_info_counter < 3)
-              {
-                sql_print_information("master sends heartbeat message");
-                hb_info_counter++;
-                if (hb_info_counter == 3)
-                  sql_print_information("the rest of heartbeat info skipped ...");
-              }
-#endif
-              /* reset transmit packet for the heartbeat event */
-              if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
-              {
-                thd->EXIT_COND(&old_stage);
-                goto err;
-              }
-              if (send_heartbeat_event(info.net, packet, p_coord,
-                                       info.current_checksum_alg))
-              {
-                errmsg = "Failed on my_net_write()";
-                my_errno= ER_UNKNOWN_ERROR;
-                thd->EXIT_COND(&old_stage);
-                goto err;
-              }
-            }
-            else
-            {
-              DBUG_PRINT("wait",("binary log received update or a broadcast signal caught"));
-            }
-          } while (signal_cnt == mysql_bin_log.signal_cnt);
-          thd->EXIT_COND(&old_stage);
-        }
-        break;
-            
-        default:
-          mysql_mutex_unlock(log_lock);
-          test_for_non_eof_log_read_errors(error, &errmsg);
-          goto err;
-	}
-
-        if (read_packet)
-        {
-          if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
-                                            ev_offset, &error_gtid)))
-          {
-            errmsg= tmp_msg;
-            goto err;
-          }
-          if (unlikely(info.send_fake_gtid_list)
-              && info.gtid_skip_group == GTID_SKIP_NOT)
-          {
-            Gtid_list_log_event glev(&info.until_binlog_state, 0);
+  IO_CACHE log;
+  File file = -1;
+  String* const packet= &thd->packet;
 
-            if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
-                fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
-            {
-              my_errno= ER_UNKNOWN_ERROR;
-              goto err;
-            }
-            info.send_fake_gtid_list= false;
-          }
-          if (info.until_gtid_state &&
-              is_until_reached(&info, &ev_offset, event_type, &errmsg,
-                               my_b_tell(&log)))
-          {
-            if (errmsg)
-            {
-              my_errno= ER_UNKNOWN_ERROR;
-              goto err;
-            }
-            goto end;
-          }
-        }
+  binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name);
+  binlog_send_info *info= &infoobj;
 
-	log.error=0;
-      }
-    }
-    else
-    {
-      bool loop_breaker = 0;
-      /* need this to break out of the for loop from switch */
+  int old_max_allowed_packet= thd->variables.max_allowed_packet;
+  thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
 
-      THD_STAGE_INFO(thd, stage_finished_reading_one_binlog_switching_to_next_binlog);
-      switch (mysql_bin_log.find_next_log(&linfo, 1)) {
-      case 0:
-	break;
-      case LOG_INFO_EOF:
-        if (mysql_bin_log.is_active(log_file_name))
-        {
-          loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
-          break;
-        }
-      default:
-	errmsg = "could not find next log";
-	my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-	goto err;
-      }
+  DBUG_ENTER("mysql_binlog_send");
+  DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));
 
-      if (loop_breaker)
-        break;
+  bzero((char*) &log,sizeof(log));
 
-      end_io_cache(&log);
-      mysql_file_close(file, MYF(MY_WME));
+  if (init_binlog_sender(info, &linfo, log_ident, &pos))
+    goto err;
 
-      /* reset transmit packet for the possible fake rotate event */
-      if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
-        goto err;
-      
+  /*
+     run hook first when all check has been made that slave seems to
+     be requesting a reasonable position. i.e when transmit actually starts
+  */
+  if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+  {
+    info->errmsg= "Failed to run hook 'transmit_start'";
+    info->error= ER_UNKNOWN_ERROR;
+    goto err;
+  }
+
+  /*
+    heartbeat_period from @master_heartbeat_period user variable
+    NOTE: this is initialized after transmit_start-hook so that
+    the hook can affect value of heartbeat period
+  */
+  info->heartbeat_period= get_heartbeat_period(thd);
+
+  while (!should_stop(info))
+  {
+    /*
+      Tell the client about the log name with a fake Rotate event;
+      this is needed even if we also send a Format_description_log_event
+      just after, because that event does not contain the binlog's name.
+      Note that as this Rotate event is sent before
+      Format_description_log_event, the slave cannot have any info to
+      understand this event's format, so the header len of
+      Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
+      than other events except FORMAT_DESCRIPTION_EVENT).
+      Before 4.0.14 we called fake_rotate_event below only if (pos ==
+      BIN_LOG_HEADER_SIZE), because if this is false then the slave
+      already knows the binlog's name.
+      Since, we always call fake_rotate_event; if the slave already knew
+      the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
+      useless but does not harm much. It is nice for 3.23 (>=.58) slaves
+      which test Rotate events to see if the master is 4.0 (then they
+      choose to stop because they can't replicate 4.0); by always calling
+      fake_rotate_event we are sure that 3.23.58 and newer will detect the
+      problem as soon as replication starts (BUG#198).
+      Always calling fake_rotate_event makes sending of normal
+      (=from-binlog) Rotate events a priori unneeded, but it is not so
+      simple: the 2 Rotate events are not equivalent, the normal one is
+      before the Stop event, the fake one is after. If we don't send the
+      normal one, then the Stop event will be interpreted (by existing 4.0
+      slaves) as "the master stopped", which is wrong. So for safety,
+      given that we want minimum modification of 4.0, we send the normal
+      and fake Rotates.
+    */
+    if (fake_rotate_event(info, pos, &info->errmsg, info->current_checksum_alg))
+    {
       /*
-        Call fake_rotate_event() in case the previous log (the one which
-        we have just finished reading) did not contain a Rotate event
-        (for example (I don't know any other example) the previous log
-        was the last one before the master was shutdown & restarted).
-        This way we tell the slave about the new log's name and
-        position.  If the binlog is 5.0, the next event we are going to
-        read and send is Format_description_log_event.
+        This error code is not perfect, as fake_rotate_event() does not
+        read anything from the binlog; if it fails it's because of an
+        error in my_net_write(), fortunately it will say so in errmsg.
       */
-      if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
-	  fake_rotate_event(&info, BIN_LOG_HEADER_SIZE, &errmsg,
-                            info.current_checksum_alg))
-      {
-	my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
-	goto err;
-      }
+      info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+      goto err;
+    }
 
-      p_coord->file_name= log_file_name; // reset to the next
+    if ((file=open_binlog(&log, linfo.log_file_name, &info->errmsg)) < 0)
+    {
+      info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+      goto err;
     }
-  }
 
-end:
-  end_io_cache(&log);
-  mysql_file_close(file, MYF(MY_WME));
+    if (send_format_descriptor_event(info, &log, &linfo, pos))
+    {
+      info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+      goto err;
+    }
 
-  RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
-  my_eof(thd);
+    /*
+      We want to corrupt the first event that will be sent to the slave.
+      But we do not want the corruption to happen early, eg. when client does
+      BINLOG_GTID_POS(). So test case sets a DBUG trigger which causes us to
+      set the real DBUG injection here.
+    */
+    DBUG_EXECUTE_IF("corrupt_read_log_event_to_slave_set",
+                    {
+                      DBUG_SET("-d,corrupt_read_log_event_to_slave_set");
+                      DBUG_SET("+d,corrupt_read_log_event2");
+                    });
+
+    /*
+      Handle the case of START SLAVE UNTIL with an UNTIL condition already
+      fulfilled at the start position.
+
+      We will send one event, the format_description, and then stop.
+    */
+    if (info->until_gtid_state && info->until_gtid_state->count() == 0)
+      info->gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
+
+    THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
+    if (send_one_binlog_file(info, &log, &linfo, pos))
+      break;
+
+    if (should_stop(info))
+      break;
+
+    DBUG_EXECUTE_IF("wait_after_binlog_EOF",
+                    {
+                      const char act[]= "now wait_for signal.rotate_finished";
+                      DBUG_ASSERT(!debug_sync_set_action(current_thd,
+                                                         STRING_WITH_LEN(act)));
+                    };);
+
+    THD_STAGE_INFO(thd,
+                   stage_finished_reading_one_binlog_switching_to_next_binlog);
+    if (mysql_bin_log.find_next_log(&linfo, 1))
+    {
+      info->errmsg= "could not find next log";
+      info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+      break;
+    }
+
+    /** start from start of next file */
+    pos= BIN_LOG_HEADER_SIZE;
+
+    /** close current cache/file */
+    end_io_cache(&log);
+    mysql_file_close(file, MYF(MY_WME));
+    file= -1;
+  }
+
+err:
   THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
+  RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
+
+  const bool binlog_open = my_b_inited(&log);
+  if (file >= 0)
+  {
+    end_io_cache(&log);
+    mysql_file_close(file, MYF(MY_WME));
+  }
+
   mysql_mutex_lock(&LOCK_thread_count);
   thd->current_linfo = 0;
   mysql_mutex_unlock(&LOCK_thread_count);
   thd->variables.max_allowed_packet= old_max_allowed_packet;
-  delete info.fdev;
-  DBUG_VOID_RETURN;
+  delete info->fdev;
 
-err:
-  THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
-  if (my_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG && my_b_inited(&log))
+  if (info->error == ER_MASTER_FATAL_ERROR_READING_BINLOG && binlog_open)
   {
-    /* 
-       detailing the fatal error message with coordinates 
+    /*
+       detailing the fatal error message with coordinates
        of the last position read.
     */
-    my_snprintf(error_text, sizeof(error_text),
+    my_snprintf(info->error_text, sizeof(info->error_text),
                 "%s; the first event '%s' at %lld, "
                 "the last event read from '%s' at %lld, "
                 "the last byte read from '%s' at %lld.",
-                errmsg,
-                my_basename(p_start_coord->file_name), p_start_coord->pos,
-                my_basename(p_coord->file_name), p_coord->pos,
-                my_basename(log_file_name), my_b_tell(&log));
+                info->errmsg,
+                my_basename(info->start_log_file_name), info->start_pos,
+                my_basename(info->log_file_name), info->last_pos,
+                my_basename(info->log_file_name), linfo.pos);
   }
-  else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG)
+  else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG)
   {
-    my_snprintf(error_text, sizeof(error_text),
+    my_snprintf(info->error_text, sizeof(info->error_text),
                 "Error: connecting slave requested to start from GTID "
                 "%u-%u-%llu, which is not in the master's binlog",
-                error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no);
+                info->error_gtid.domain_id,
+                info->error_gtid.server_id,
+                info->error_gtid.seq_no);
     /* Use this error code so slave will know not to try reconnect. */
-    my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+    info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
   }
-  else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2)
+  else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2)
   {
-    my_snprintf(error_text, sizeof(error_text),
+    my_snprintf(info->error_text, sizeof(info->error_text),
                 "Error: connecting slave requested to start from GTID "
                 "%u-%u-%llu, which is not in the master's binlog. Since the "
                 "master's binlog contains GTIDs with higher sequence numbers, "
                 "it probably means that the slave has diverged due to "
                 "executing extra errorneous transactions",
-                error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no);
+                info->error_gtid.domain_id,
+                info->error_gtid.server_id,
+                info->error_gtid.seq_no);
     /* Use this error code so slave will know not to try reconnect. */
-    my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+    info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
   }
-  else if (my_errno == ER_GTID_START_FROM_BINLOG_HOLE)
+  else if (info->error == ER_GTID_START_FROM_BINLOG_HOLE)
   {
-    my_snprintf(error_text, sizeof(error_text),
+    my_snprintf(info->error_text, sizeof(info->error_text),
                 "The binlog on the master is missing the GTID %u-%u-%llu "
                 "requested by the slave (even though both a prior and a "
                 "subsequent sequence number does exist), and GTID strict mode "
                 "is enabled",
-                error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no);
+                info->error_gtid.domain_id,
+                info->error_gtid.server_id,
+                info->error_gtid.seq_no);
     /* Use this error code so slave will know not to try reconnect. */
-    my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+    info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
   }
-  else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE)
+  else if (info->error == ER_CANNOT_LOAD_SLAVE_GTID_STATE)
   {
-    my_snprintf(error_text, sizeof(error_text),
+    my_snprintf(info->error_text, sizeof(info->error_text),
                 "Failed to load replication slave GTID state from table %s.%s",
                 "mysql", rpl_gtid_slave_state_table_name.str);
-    my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+    info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+  }
+  else if (info->error != 0 && info->errmsg != NULL)
+    strcpy(info->error_text, info->errmsg);
+
+  if (info->error == 0)
+  {
+    my_eof(thd);
   }
   else
-    strcpy(error_text, errmsg);
-  end_io_cache(&log);
-  RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
-  /*
-    Exclude  iteration through thread list
-    this is needed for purge_logs() - it will iterate through
-    thread list and update thd->current_linfo->index_file_offset
-    this mutex will make sure that it never tried to update our linfo
-    after we return from this stack frame
-  */
-  mysql_mutex_lock(&LOCK_thread_count);
-  thd->current_linfo = 0;
-  mysql_mutex_unlock(&LOCK_thread_count);
-  if (file >= 0)
-    mysql_file_close(file, MYF(MY_WME));
-  thd->variables.max_allowed_packet= old_max_allowed_packet;
-  delete info.fdev;
+  {
+    my_message(info->error, info->error_text, MYF(0));
+  }
 
-  my_message(my_errno, error_text, MYF(0));
   DBUG_VOID_RETURN;
 }
 
