Skip to content

Commit 893fc40

Browse files
Add logic to support ranged GTID updates from proxysql_mysqlbinlog.
This implementation supports ranged updates to GTID sets, automatically compacting and deduping overlapping ranges when appropriate.
1 parent 8cf3e59 commit 893fc40

File tree

6 files changed

+194
-88
lines changed

6 files changed

+194
-88
lines changed

include/GTID_Server_Data.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ class GTID_Server_Data {
3131
void dump();
3232
};
3333

34+
bool addGtid(const std::string& uuid, const gtid_interval_t &iv, gtid_set_t& gtid_executed);
35+
bool addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed);
3436
bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end);
3537

3638
#endif // CLASS_GTID_Server_Data_H

include/MySQL_HostGroups_Manager.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ struct peer_runtime_mysql_servers_t;
132132
struct peer_mysql_servers_v2_t;
133133

134134
std::string gtid_executed_to_string(gtid_set_t& gtid_executed);
135-
void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed);
136135

137136
#include "GTID_Server_Data.h"
138137

include/proxysql_gtid.h

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,39 @@
22
#define PROXYSQL_GTID
33
// highly inspired by libslave
44
// https://github.com/vozbu/libslave/
5+
#include <list>
56
#include <string>
67
#include <unordered_map>
7-
#include <list>
88
#include <utility>
99

1010
typedef std::pair<std::string, int64_t> gtid_t;
11-
typedef std::pair<int64_t, int64_t> gtid_interval_t;
11+
12+
class Gtid_Interval {
13+
public:
14+
int64_t start;
15+
int64_t end;
16+
17+
private:
18+
void init(const int64_t _start, const int64_t _end);
19+
void init(const char* s);
20+
21+
public:
22+
explicit Gtid_Interval(const int64_t _start, const int64_t _end);
23+
explicit Gtid_Interval(const char* s);
24+
explicit Gtid_Interval(const std::string& s);
25+
26+
const std::string to_string(void);
27+
const bool contains(const Gtid_Interval &iv);
28+
const bool contains(int64_t gtid);
29+
const bool merge(const Gtid_Interval& other);
30+
31+
const int cmp(const Gtid_Interval& other);
32+
const bool operator<(const Gtid_Interval& other);
33+
const bool operator==(const Gtid_Interval& other);
34+
};
35+
typedef Gtid_Interval gtid_interval_t;
36+
37+
// TODO: make me a proper class.
1238
typedef std::unordered_map<std::string, std::list<gtid_interval_t>> gtid_set_t;
1339

1440
/*
@@ -31,4 +57,4 @@ class Gtid_Server_Info {
3157
};
3258
*/
3359

34-
#endif /* PROXYSQL_GTID */
60+
#endif /* PROXYSQL_GTID */

lib/GTID_Server_Data.cpp

Lines changed: 46 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ bool GTID_Server_Data::gtid_exists(char *gtid_uuid, uint64_t gtid_trxid) {
269269
return false;
270270
}
271271
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) {
272-
if ((int64_t)gtid_trxid >= itr->first && (int64_t)gtid_trxid <= itr->second) {
272+
if (itr->contains((int64_t)gtid_trxid)) {
273273
// fprintf(stderr,"YES\n");
274274
return true;
275275
}
@@ -412,12 +412,7 @@ std::string gtid_executed_to_string(gtid_set_t& gtid_executed) {
412412
s.insert(23,"-");
413413
s = s + ":";
414414
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) {
415-
std::string s2 = s;
416-
s2 = s2 + std::to_string(itr->first);
417-
s2 = s2 + "-";
418-
s2 = s2 + std::to_string(itr->second);
419-
s2 = s2 + ",";
420-
gtid_set = gtid_set + s2;
415+
gtid_set += s + itr->to_string() + ",";
421416
}
422417
}
423418
// Extract latest comma only in case 'gtid_executed' isn't empty
@@ -427,55 +422,6 @@ std::string gtid_executed_to_string(gtid_set_t& gtid_executed) {
427422
return gtid_set;
428423
}
429424

430-
431-
432-
void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
433-
auto it = gtid_executed.find(gtid.first);
434-
if (it == gtid_executed.end())
435-
{
436-
gtid_executed[gtid.first].emplace_back(gtid.second, gtid.second);
437-
return;
438-
}
439-
440-
bool flag = true;
441-
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr)
442-
{
443-
if (gtid.second >= itr->first && gtid.second <= itr->second)
444-
return;
445-
if (gtid.second + 1 == itr->first)
446-
{
447-
--itr->first;
448-
flag = false;
449-
break;
450-
}
451-
else if (gtid.second == itr->second + 1)
452-
{
453-
++itr->second;
454-
flag = false;
455-
break;
456-
}
457-
else if (gtid.second < itr->first)
458-
{
459-
it->second.emplace(itr, gtid.second, gtid.second);
460-
return;
461-
}
462-
}
463-
464-
if (flag)
465-
it->second.emplace_back(gtid.second, gtid.second);
466-
467-
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr)
468-
{
469-
auto next_itr = std::next(itr);
470-
if (next_itr != it->second.end() && itr->second + 1 == next_itr->first)
471-
{
472-
itr->second = next_itr->second;
473-
it->second.erase(next_itr);
474-
break;
475-
}
476-
}
477-
}
478-
479425
/**
480426
* @brief Adds or updates a GTID interval in the executed set
481427
*
@@ -492,48 +438,64 @@ void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
492438
*
493439
* @param gtid_executed Reference to the GTID set to update
494440
* @param server_uuid The server UUID string
495-
* @param txid_start Starting transaction ID of the interval
496-
* @param txid_end Ending transaction ID of the interval
441+
* @param iv GTID interval to merge into the existing set
497442
* @return bool True if the GTID set was updated, false if interval already existed
498443
*
499444
* @note This function is critical for maintaining accurate GTID metrics across
500445
* binlog reader reconnections and preventing events_count resets.
501446
*/
502-
bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end) {
503-
bool updated = true;
504-
505-
auto it = gtid_executed.find(server_uuid);
447+
bool addGtid(const std::string& uuid, const gtid_interval_t &iv, gtid_set_t& gtid_executed) {
448+
auto it = gtid_executed.find(uuid);
506449
if (it == gtid_executed.end()) {
507-
gtid_executed[server_uuid].emplace_back(txid_start, txid_end);
508-
return updated;
450+
// new UUID entry
451+
gtid_executed[uuid].emplace_back(iv);
452+
return true;
509453
}
510454

511-
bool insert = true;
512-
513-
// When ProxySQL reconnects with binlog reader, it might
514-
// receive updated txid intervals in the bootstrap message.
515-
// For example,
516-
// before disconnection -> server_UUID:1-10
517-
// after reconnection -> server_UUID:1-19
518-
auto &txid_intervals = it->second;
519-
for (auto &interval : txid_intervals) {
520-
if (interval.first == txid_start) {
521-
if(interval.second == txid_end) {
522-
updated = false;
523-
} else {
524-
interval.second = txid_end;
525-
}
526-
insert = false;
455+
// insert/merge GTID interval
456+
auto pos = it->second.begin();
457+
for (; pos != it->second.end(); ++pos) {
458+
if (pos->contains(iv)) {
459+
// nothing to do
460+
return false;
461+
}
462+
if (pos->merge(iv))
463+
break;
464+
}
465+
if (pos == it->second.end()) {
466+
it->second.emplace_back(iv);
467+
}
468+
469+
// merge overlapping GTID ranges, if any
470+
it->second.sort();
471+
auto a = it->second.begin();
472+
while (a != it->second.end()) {
473+
auto b = std::next(a);
474+
if (b == it->second.end()) {
527475
break;
528476
}
477+
if (a->merge(*b)) {
478+
it->second.erase(b);
479+
continue;
480+
}
481+
a++;
529482
}
530483

531-
if (insert) {
532-
txid_intervals.emplace_back(txid_start, txid_end);
484+
return true;
485+
}
533486

534-
}
487+
// Merges a single GTID into a gitd_executed instance.
488+
// TODO: make me a method when gtid_set_t is converted into a class.
489+
inline bool addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
490+
gtid_interval_t iv = Gtid_Interval(gtid.second, gtid.second);
491+
return addGtid(gtid.first, iv, gtid_executed);
492+
}
535493

536-
return updated;
494+
// Merges a single GTID into a gitd_executed instance.
495+
// TODO: make me a method when gtid_set_t is converted into a class.
496+
inline bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end) {
497+
gtid_interval_t iv = Gtid_Interval(txid_start, txid_end);
498+
return addGtid(server_uuid, iv, gtid_executed);
537499
}
538500

539501
void * GTID_syncer_run() {

lib/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ _OBJ_CXX := ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo
7171
ProxySQL_Admin_Tests.oo ProxySQL_Admin_Tests2.oo ProxySQL_Admin_Scheduler.oo ProxySQL_Admin_Disk_Upgrade.oo ProxySQL_Admin_Stats.oo \
7272
Admin_Handler.oo Admin_FlushVariables.oo Admin_Bootstrap.oo \
7373
Base_Session.oo Base_Thread.oo \
74+
proxysql_gtid.oo \
7475
proxy_protocol_info.oo \
7576
proxysql_find_charset.oo ProxySQL_Poll.oo \
7677
PgSQL_Protocol.oo PgSQL_Thread.oo PgSQL_Data_Stream.oo PgSQL_Session.oo PgSQL_Variables.oo PgSQL_HostGroups_Manager.oo PgSQL_Connection.oo PgSQL_Backend.oo PgSQL_Logger.oo PgSQL_Authentication.oo PgSQL_Error_Helper.oo \

lib/proxysql_gtid.cpp

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#include <cstdint>
2+
#include <cstdio>
3+
#include <cstdlib>
4+
#include <string>
5+
6+
#include "proxysql_gtid.h"
7+
8+
9+
// Initializes a GTID interval.
10+
// Implemented via private method as C++03 does not support delegated constructors :(
11+
void Gtid_Interval::init(const int64_t _start, const int64_t _end) {
12+
start = _start;
13+
end = _end;
14+
15+
if (start > end) {
16+
std::swap(start, end);
17+
}
18+
}
19+
20+
// Initializes a GTID interval from a string buffer, in [gtid]{-[gtid]} format.
21+
void Gtid_Interval::init(const char *s) {
22+
uint64_t _start, _end;
23+
24+
if (sscanf(s, "%lu-%lu", &_start, &_end) == 2) {
25+
init((int64_t)_start, (int64_t)_end);
26+
return;
27+
}
28+
if (sscanf(s, "%lu", &_start) == 1) {
29+
init((int64_t)_start, (int64_t)_start);
30+
return;
31+
}
32+
33+
init(0, 0);
34+
}
35+
36+
37+
Gtid_Interval::Gtid_Interval(const int64_t _start, const int64_t _end) {
38+
init(_start, _end);
39+
}
40+
41+
Gtid_Interval::Gtid_Interval(const char *s) {
42+
init(s);
43+
}
44+
45+
Gtid_Interval::Gtid_Interval(const std::string& s) {
46+
init(s.c_str());
47+
}
48+
49+
// Checks if a given GTID interval is contained in this interval.
50+
const bool Gtid_Interval::contains(const Gtid_Interval &other) {
51+
return (other.start >= start && other.end <= end);
52+
}
53+
54+
// Checks if a given GTID is contained in this interval.
55+
const bool Gtid_Interval::contains(int64_t gtid) {
56+
return (gtid >= start && gtid <= end);
57+
}
58+
59+
// Yields a string representation for a GTID interval.
60+
const std::string Gtid_Interval::to_string(void) {
61+
if (start == end) {
62+
return std::to_string(start);
63+
}
64+
return std::to_string(start) + "-" + std::to_string(end);
65+
}
66+
67+
// Attempts to merge two GTID intervals. Returns true if the intervals were merged (and potentially modified), false otherwise.
68+
const bool Gtid_Interval::merge(const Gtid_Interval& other) {
69+
if (other.start >= start && other.end <= end) {
70+
// other is contained by interval
71+
return true;
72+
}
73+
if (other.start <= start && other.end >= end) {
74+
// other contains whole of existing interval
75+
start = other.start;
76+
end = other.end;
77+
return true;
78+
}
79+
if (other.start <= start && other.end >= (start-1)) {
80+
// other overlaps interval at start
81+
start = other.start;
82+
return true;
83+
}
84+
if (other.end >= end && other.start <= (end+1)) {
85+
// other overlaps interval at end
86+
end = other.end;
87+
return true;
88+
}
89+
90+
return false;
91+
}
92+
93+
// Comapres two GTID intervals, by strict weak ordering.
94+
const int Gtid_Interval::cmp(const Gtid_Interval& other) {
95+
if (start < other.start) {
96+
return -1;
97+
}
98+
if (start > other.start) {
99+
return 1;
100+
}
101+
if (end < other.end) {
102+
return -1;
103+
}
104+
if (end > other.end) {
105+
return 1;
106+
}
107+
return 0;
108+
}
109+
110+
const bool Gtid_Interval::operator<(const Gtid_Interval& other) {
111+
return cmp(other) == -1;
112+
}
113+
114+
const bool Gtid_Interval::operator==(const Gtid_Interval& other) {
115+
return cmp(other) == 0;
116+
}

0 commit comments

Comments
 (0)