int ha_spider::rnd_next_internal(
  uchar *buf
) {
  int error_num;
  ha_spider *direct_limit_offset_spider =
    (ha_spider *) partition_handler->owner;
  backup_error_status();
  DBUG_ENTER("ha_spider::rnd_next_internal");
  DBUG_PRINT("info",("spider this=%p", this));
  if (wide_handler->trx->thd->killed)
  {
    my_error(ER_QUERY_INTERRUPTED, MYF(0));
    DBUG_RETURN(ER_QUERY_INTERRUPTED);
  }
  /* do not copy table data at alter table */
  if (wide_handler->sql_command == SQLCOM_ALTER_TABLE)
    DBUG_RETURN(HA_ERR_END_OF_FILE);
  do_direct_update = FALSE;

  if (rnd_scan_and_first)
  {
    if ((error_num = spider_set_conn_bg_param(this)))
      DBUG_RETURN(error_num);
    check_direct_order_limit();
    check_select_column(TRUE);

    if (this->result_list.direct_limit_offset)
    {
      if (direct_limit_offset_spider->direct_select_limit == 0)
      { // mean has got all result
        DBUG_RETURN(check_error_mode_eof(HA_ERR_END_OF_FILE));
      }
      if (
        partition_handler->handlers &&
        direct_limit_offset_spider->direct_current_offset > 0
      ) {
        longlong table_count = this->records();
        DBUG_PRINT("info",("spider table_count=%lld", table_count));
        if (table_count <= direct_limit_offset_spider->direct_current_offset)
        {
          // skip this spider(partition)
          direct_limit_offset_spider->direct_current_offset -= table_count;
          DBUG_PRINT("info",("spider direct_current_offset=%lld",
            direct_limit_offset_spider->direct_current_offset));
          DBUG_RETURN(check_error_mode_eof(HA_ERR_END_OF_FILE));
        }
      }

      // make the offset/limit statement
      DBUG_PRINT("info",("spider direct_current_offset=%lld",
        direct_limit_offset_spider->direct_current_offset));
      result_list.internal_offset = direct_limit_offset_spider->direct_current_offset;
      DBUG_PRINT("info",("spider direct_select_limit=%lld",
        direct_limit_offset_spider->direct_select_limit));
      result_list.internal_limit = direct_limit_offset_spider->direct_select_limit;
      result_list.split_read = direct_limit_offset_spider->direct_select_limit;

      // start with this spider(partition)
      direct_limit_offset_spider->direct_current_offset = 0;
    }

    DBUG_PRINT("info",("spider result_list.finish_flg = FALSE"));
    result_list.finish_flg = FALSE;
    result_list.record_num = 0;
    if (
      (error_num = spider_db_append_select(this)) ||
      (error_num = spider_db_append_select_columns(this))
    )
      DBUG_RETURN(error_num);
    set_where_pos_sql(SPIDER_SQL_TYPE_SELECT_SQL);

    /* append condition pushdown */
    if (spider_db_append_condition(this, NULL, 0, FALSE))
      DBUG_RETURN(HA_ERR_OUT_OF_MEM);

    set_order_pos_sql(SPIDER_SQL_TYPE_SELECT_SQL);
    if (result_list.direct_order_limit)
    {
      if ((error_num =
        append_key_order_for_direct_order_limit_with_alias_sql_part(
          NULL, 0, SPIDER_SQL_TYPE_SELECT_SQL)))
        DBUG_RETURN(error_num);
    }
    else if (result_list.direct_aggregate)
    {
      if ((error_num =
        append_group_by_sql_part(NULL, 0, SPIDER_SQL_TYPE_SELECT_SQL)))
        DBUG_RETURN(error_num);
    }
    result_list.desc_flg = FALSE;
    result_list.sorted = FALSE;
    result_list.key_info = NULL;
    result_list.limit_num =
      result_list.internal_limit >= result_list.split_read ?
      result_list.split_read : result_list.internal_limit;
    {
      if ((error_num = append_limit_sql_part(
        result_list.internal_offset,
        result_list.limit_num,
        SPIDER_SQL_TYPE_SELECT_SQL)))
      {
        DBUG_RETURN(error_num);
      }
      if (
        (error_num = append_select_lock_sql_part(
          SPIDER_SQL_TYPE_SELECT_SQL))
      ) {
        DBUG_RETURN(error_num);
      }
    }

    int roop_start, roop_end, roop_count, tmp_lock_mode, link_ok;
    tmp_lock_mode = spider_conn_lock_mode(this);
    if (tmp_lock_mode)
    {
      /* "for update" or "lock in share mode" */
      link_ok = spider_conn_link_idx_next(share->link_statuses,
        conn_link_idx, -1, share->link_count,
        SPIDER_LINK_STATUS_OK);
      roop_start = spider_conn_link_idx_next(share->link_statuses,
        conn_link_idx, -1, share->link_count,
        SPIDER_LINK_STATUS_RECOVERY);
      roop_end = share->link_count;
    } else {
      link_ok = search_link_idx;
      roop_start = search_link_idx;
      roop_end = search_link_idx + 1;
    }
    for (roop_count = roop_start; roop_count < roop_end;
      roop_count = spider_conn_link_idx_next(share->link_statuses,
        conn_link_idx, roop_count, share->link_count,
        SPIDER_LINK_STATUS_RECOVERY)
    ) {
      if (result_list.bgs_phase > 0)
      {
        if ((error_num = spider_check_and_init_casual_read(
          wide_handler->trx->thd, this,
          roop_count)))
          DBUG_RETURN(error_num);
        if ((error_num = spider_bg_conn_search(this, roop_count, roop_start,
          TRUE, FALSE, (roop_count != link_ok))))
        {
          if (
            error_num != HA_ERR_END_OF_FILE &&
            share->monitoring_kind[roop_count] &&
            need_mons[roop_count]
          ) {
            error_num = spider_ping_table_mon_from_table(
                wide_handler->trx,
                wide_handler->trx->thd,
                share,
                roop_count,
                (uint32) share->monitoring_sid[roop_count],
                share->table_name,
                share->table_name_length,
                conn_link_idx[roop_count],
                NULL,
                0,
                share->monitoring_kind[roop_count],
                share->monitoring_limit[roop_count],
                share->monitoring_flag[roop_count],
                TRUE
              );
          }
          DBUG_RETURN(check_error_mode_eof(error_num));
        }
      } else {
        SPIDER_CONN *conn = conns[roop_count];
        ulong sql_type;
        sql_type= SPIDER_SQL_TYPE_SELECT_SQL;
        spider_db_handler *dbton_hdl = dbton_handler[conn->dbton_id];
        pthread_mutex_assert_not_owner(&conn->mta_conn_mutex);
        if ((error_num =
          dbton_hdl->set_sql_for_exec(sql_type, roop_count)))
        {
          DBUG_RETURN(error_num);
        }
        pthread_mutex_lock(&conn->mta_conn_mutex);
        SPIDER_SET_FILE_POS(&conn->mta_conn_mutex_file_pos);
        DBUG_PRINT("info",("spider sql_type=%lu", sql_type));
        conn->need_mon = &need_mons[roop_count];
        DBUG_ASSERT(!conn->mta_conn_mutex_lock_already);
        DBUG_ASSERT(!conn->mta_conn_mutex_unlock_later);
        conn->mta_conn_mutex_lock_already = TRUE;
        conn->mta_conn_mutex_unlock_later = TRUE;
        if ((error_num = spider_db_set_names(this, conn,
          roop_count)))
        {
          DBUG_ASSERT(conn->mta_conn_mutex_lock_already);
          DBUG_ASSERT(conn->mta_conn_mutex_unlock_later);
          conn->mta_conn_mutex_lock_already = FALSE;
          conn->mta_conn_mutex_unlock_later = FALSE;
          SPIDER_CLEAR_FILE_POS(&conn->mta_conn_mutex_file_pos);
          pthread_mutex_unlock(&conn->mta_conn_mutex);
          if (
            share->monitoring_kind[roop_count] &&
            need_mons[roop_count]
          ) {
            error_num = spider_ping_table_mon_from_table(
                wide_handler->trx,
                wide_handler->trx->thd,
                share,
                roop_count,
                (uint32) share->monitoring_sid[roop_count],
                share->table_name,
                share->table_name_length,
                conn_link_idx[roop_count],
                NULL,
                0,
                share->monitoring_kind[roop_count],
                share->monitoring_limit[roop_count],
                share->monitoring_flag[roop_count],
                TRUE
              );
          }
          DBUG_RETURN(check_error_mode_eof(error_num));
        }
        spider_conn_set_timeout_from_share(conn, roop_count,
          wide_handler->trx->thd, share);
        if (dbton_hdl->execute_sql(
          sql_type,
          conn,
          result_list.quick_mode,
          &need_mons[roop_count])
        ) {
          DBUG_ASSERT(conn->mta_conn_mutex_lock_already);
          DBUG_ASSERT(conn->mta_conn_mutex_unlock_later);
          conn->mta_conn_mutex_lock_already = FALSE;
          conn->mta_conn_mutex_unlock_later = FALSE;
          error_num = spider_db_errorno(conn);
          if (
            share->monitoring_kind[roop_count] &&
            need_mons[roop_count]
          ) {
            error_num = spider_ping_table_mon_from_table(
                wide_handler->trx,
                wide_handler->trx->thd,
                share,
                roop_count,
                (uint32) share->monitoring_sid[roop_count],
                share->table_name,
                share->table_name_length,
                conn_link_idx[roop_count],
                NULL,
                0,
                share->monitoring_kind[roop_count],
                share->monitoring_limit[roop_count],
                share->monitoring_flag[roop_count],
                TRUE
              );
          }
          DBUG_RETURN(check_error_mode_eof(error_num));
        }
        connection_ids[roop_count] = conn->connection_id;
        DBUG_ASSERT(conn->mta_conn_mutex_lock_already);
        DBUG_ASSERT(conn->mta_conn_mutex_unlock_later);
        conn->mta_conn_mutex_lock_already = FALSE;
        conn->mta_conn_mutex_unlock_later = FALSE;
        if (roop_count == link_ok)
        {
          if ((error_num = spider_db_store_result(this, roop_count, table)))
          {
            if (
              error_num != HA_ERR_END_OF_FILE &&
              share->monitoring_kind[roop_count] &&
              need_mons[roop_count]
            ) {
              error_num = spider_ping_table_mon_from_table(
                  wide_handler->trx,
                  wide_handler->trx->thd,
                  share,
                  roop_count,
                  (uint32) share->monitoring_sid[roop_count],
                  share->table_name,
                  share->table_name_length,
                  conn_link_idx[roop_count],
                  NULL,
                  0,
                  share->monitoring_kind[roop_count],
                  share->monitoring_limit[roop_count],
                  share->monitoring_flag[roop_count],
                  TRUE
                );
            }
            DBUG_RETURN(check_error_mode_eof(error_num));
          }
          result_link_idx = link_ok;
        } else {
          spider_db_discard_result(this, roop_count, conn);
          SPIDER_CLEAR_FILE_POS(&conn->mta_conn_mutex_file_pos);
          pthread_mutex_unlock(&conn->mta_conn_mutex);
        }
      }
    }
    rnd_scan_and_first = FALSE;

    if (this->result_list.direct_limit_offset)
    {
      if (buf && (error_num = spider_db_seek_next(buf, this, search_link_idx,
        table)))
        DBUG_RETURN(check_error_mode_eof(error_num));
      DBUG_RETURN(0);
    }
  }

  if (
    result_list.direct_limit_offset &&
    direct_limit_offset_spider->direct_select_offset > 0
  ) {
    // limit-- for each got row
    direct_limit_offset_spider->direct_select_offset--;
    DBUG_RETURN(0);
  }

  if (buf && (error_num = spider_db_seek_next(buf, this, search_link_idx,
    table)))
    DBUG_RETURN(check_error_mode_eof(error_num));
  DBUG_RETURN(0);
}
