Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions include/GTID_Server_Data.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#ifndef CLASS_GTID_Server_Data_H
#define CLASS_GTID_Server_Data_H

#include <cstddef>
#include <cstdint>
#include <string>
#include <proxysql_gtid.h>

class GTID_Server_Data {
public:
char *address;
Expand All @@ -24,4 +30,7 @@ class GTID_Server_Data {
void read_all_gtids();
void dump();
};

bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end);

#endif // CLASS_GTID_Server_Data_H
1 change: 1 addition & 0 deletions include/btree.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
#include <stddef.h>
#include <string.h>
#include <sys/types.h>
#include <cstdint>
#include <algorithm>
#include <functional>
#include <iostream>
Expand Down
1 change: 1 addition & 0 deletions include/proxysql_gtid.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define PROXYSQL_GTID
// highly inspired by libslave
// https://github.com/vozbu/libslave/
#include <string>
#include <unordered_map>
#include <list>
#include <utility>
Expand Down
168 changes: 88 additions & 80 deletions lib/GTID_Server_Data.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "GTID_Server_Data.h"
#include "MySQL_HostGroups_Manager.h"

#include "ev.h"
Expand Down Expand Up @@ -47,20 +48,12 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
bool rc = true;
rc = sd->readall();
if (rc == false) {
//delete sd;
std::string s1 = sd->address;
s1.append(":");
s1.append(std::to_string(sd->mysql_port));
MyHGM->gtid_missing_nodes = true;
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
it2 = MyHGM->gtid_map.find(s1);
if (it2 != MyHGM->gtid_map.end()) {
//MyHGM->gtid_map.erase(it2);
it2->second = NULL;
delete sd;
}
sd->active = false;
proxy_warning("GTID: failed to read from ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);

ev_io_stop(MyHGM->gtid_ev_loop, w);
close(w->fd);
free(w);
} else {
sd->dump();
Expand All @@ -71,48 +64,36 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {

void connect_cb(EV_P_ ev_io *w, int revents) {
pthread_mutex_lock(&ev_loop_mutex);
struct ev_io * c = w;
if (revents & EV_WRITE) {
int optval = 0;
socklen_t optlen = sizeof(optval);
if ((getsockopt(w->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1) ||
(optval != 0)) {
/* Connection failed; try the next address in the list. */
//int errnum = optval ? optval : errno;
ev_io_stop(MyHGM->gtid_ev_loop, w);
close(w->fd);
int fd = w->fd;
GTID_Server_Data *sd = (GTID_Server_Data *)w->data;

// connect() completed, this watcher is no longer needed
ev_io_stop(MyHGM->gtid_ev_loop, w);
free(w);

// Based on fd status, proceed to next step -> waiting for read event on the socket
int error = 0;
socklen_t optlen = sizeof(error);
int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen);
if (rc == -1 || error != 0) {
/* connection failed */
MyHGM->gtid_missing_nodes = true;
GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data;
GTID_Server_Data *sd = custom_data;
std::string s1 = sd->address;
s1.append(":");
s1.append(std::to_string(sd->mysql_port));
sd->active = false;
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
it2 = MyHGM->gtid_map.find(s1);
if (it2 != MyHGM->gtid_map.end()) {
//MyHGM->gtid_map.erase(it2);
it2->second = NULL;
delete sd;
}
//delete custom_data;
free(c);
close(fd);
} else {
ev_io_stop(MyHGM->gtid_ev_loop, w);
int fd=w->fd;
struct ev_io * new_w = (struct ev_io*) malloc(sizeof(struct ev_io));
new_w->data = w->data;
GTID_Server_Data * custom_data = (GTID_Server_Data *)new_w->data;
custom_data->w = new_w;
free(w);
ev_io_init(new_w, reader_cb, fd, EV_READ);
ev_io_start(MyHGM->gtid_ev_loop, new_w);
struct ev_io *read_watcher = (struct ev_io *) malloc(sizeof(struct ev_io));
read_watcher->data = sd;
sd->w = read_watcher;
ev_io_init(read_watcher, reader_cb, fd, EV_READ);
ev_io_start(MyHGM->gtid_ev_loop, read_watcher);
}
}
pthread_mutex_unlock(&ev_loop_mutex);
}

struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port) {
struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t mysql_port) {
int s;

if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
Expand Down Expand Up @@ -149,17 +130,13 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
struct ev_io *c = (struct ev_io *)malloc(sizeof(struct ev_io));
if (c) {
ev_io_init(c, connect_cb, s, EV_WRITE);
GTID_Server_Data * custom_data = new GTID_Server_Data(c, address, gtid_port, mysql_port);
c->data = (void *)custom_data;
return c;
}
/* else error */
}
return NULL;
}



GTID_Server_Data::GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) {
active = true;
w = _w;
Expand Down Expand Up @@ -188,26 +165,32 @@ GTID_Server_Data::~GTID_Server_Data() {
}

bool GTID_Server_Data::readall() {
bool ret = true;
if (size == len) {
// buffer is full, expand
resize(len*2);
resize(len * 2);
}

int rc = 0;
rc = read(w->fd,data+len,size-len);
rc = read(w->fd, data+len, size-len);
if (rc > 0) {
len += rc;
return true;
}

int myerr = errno;
if (rc == 0) {
proxy_info("Read returned EOF\n");
return false;
}

// rc == -1
proxy_error("Read failed, error %d\n", myerr);
if(myerr == EINTR || myerr == EAGAIN) {
// non-blocking fd, so this should not be considered as an error
return true;
} else {
int myerr = errno;
proxy_error("Read returned %d bytes, error %d\n", rc, myerr);
if (
(rc == 0) ||
(rc==-1 && myerr != EINTR && myerr != EAGAIN)
) {
ret = false;
}
return false;
}
return ret;
}


Expand Down Expand Up @@ -239,9 +222,6 @@ void GTID_Server_Data::dump() {
return;
}
read_all_gtids();
//int rc = write(1,data+pos,len-pos);
fflush(stdout);
///pos += rc;
if (pos >= len/2) {
memmove(data,data+pos,len-pos);
len = len-pos;
Expand Down Expand Up @@ -285,13 +265,12 @@ bool GTID_Server_Data::read_next_gtid() {
bs[l-3] = '\0';
char *saveptr1=NULL;
char *saveptr2=NULL;
//char *saveptr3=NULL;
char *token = NULL;
char *subtoken = NULL;
//char *subtoken2 = NULL;
char *str1 = NULL;
char *str2 = NULL;
//char *str3 = NULL;
bool updated = false;

for (str1 = bs; ; str1 = NULL) {
token = strtok_r(str1, ",", &saveptr1);
if (token == NULL) {
Expand All @@ -312,56 +291,48 @@ bool GTID_Server_Data::read_next_gtid() {
p++;
}
}
//fprintf(stdout,"BS from %s\n", uuid_server);
} else { // we are reading the trxids
uint64_t trx_from;
uint64_t trx_to;
sscanf(subtoken,"%lu-%lu",&trx_from,&trx_to);
//fprintf(stdout,"BS from %s:%lu-%lu\n", uuid_server, trx_from, trx_to);
std::string s = uuid_server;
gtid_executed[s].emplace_back(trx_from, trx_to);
updated = addGtidInterval(gtid_executed, uuid_server, trx_from, trx_to) || updated;
}
}
}
pos += l+1;
free(bs);
//return true;

if (updated) {
events_read++;
}
} else {
strncpy(rec_msg,data+pos,l);
pos += l+1;
rec_msg[l] = 0;
//int rc = write(1,data+pos,l+1);
//fprintf(stdout,"%s\n", rec_msg);
if (rec_msg[0]=='I') {
//char rec_uuid[80];
uint64_t rec_trxid = 0;
char *a = NULL;
int ul = 0;
switch (rec_msg[1]) {
case '1':
//sscanf(rec_msg+3,"%s\:%lu",uuid_server,&rec_trxid);
a = strchr(rec_msg+3,':');
ul = a-rec_msg-3;
strncpy(uuid_server,rec_msg+3,ul);
uuid_server[ul] = 0;
rec_trxid=atoll(a+1);
break;
case '2':
//sscanf(rec_msg+3,"%lu",&rec_trxid);
rec_trxid=atoll(rec_msg+3);
break;
default:
break;
}
//fprintf(stdout,"%s:%lu\n", uuid_server, rec_trxid);
std::string s = uuid_server;
gtid_t new_gtid = std::make_pair(s,rec_trxid);
addGtid(new_gtid,gtid_executed);
events_read++;
//return true;
}
}
//std::cout << "current pos " << gtid_executed_to_string(gtid_executed) << std::endl << std::endl;
return true;
}

Expand Down Expand Up @@ -439,6 +410,43 @@ void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
}
}

bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end) {
bool updated = true;

auto it = gtid_executed.find(server_uuid);
if (it == gtid_executed.end()) {
gtid_executed[server_uuid].emplace_back(txid_start, txid_end);
return updated;
}

bool insert = true;

// When ProxySQL reconnects with binlog reader, it might
// receive updated txid intervals in the bootstrap message.
// For example,
// before disconnection -> server_UUID:1-10
// after reconnection -> server_UUID:1-19
auto &txid_intervals = it->second;
for (auto &interval : txid_intervals) {
if (interval.first == txid_start) {
if(interval.second == txid_end) {
updated = false;
} else {
interval.second = txid_end;
}
insert = false;
break;
}
}

if (insert) {
txid_intervals.emplace_back(txid_start, txid_end);

}

return updated;
}

void * GTID_syncer_run() {
//struct ev_loop * gtid_ev_loop;
//gtid_ev_loop = NULL;
Expand Down
Loading