Skip to content

Commit 308ded0

Browse files
committed
feat(backend): Add the logic that creates the switching request for mysql switcher. issue: #13768
1 parent d33874a commit 308ded0

File tree

21 files changed

+778
-596
lines changed

21 files changed

+778
-596
lines changed

dbm-services/common/dbha-v2/etc/analysis.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ workflow:
1919
readDbMetaOffsetDuration: -24h
2020
readDbMetricOffsetDuration: -60s
2121
readDbEventOffsetDuration: -10m
22+
enableSwitching: true
2223

2324
dbmApiMetadata:
2425
api: http://127.0.0.1:80/apis/proxypass/dbmeta/dbha/instances
@@ -39,17 +40,17 @@ workflow:
3940
api: http://127.0.0.1:80/apis/proxypass/dbmeta/dbha/tendis_cluster_swap
4041
timeout: 10s
4142
token: ""
42-
43+
4344
dbmApiDomainGet:
4445
api: http://127.0.0.1:80/apis/proxypass/dns/domain/get/
4546
timeout: 10s
4647
token: ""
47-
48+
4849
dbmApiDomainDelete:
4950
api: http://127.0.0.1:80/apis/proxypass/dns/domain/delete/
5051
timeout: 10s
5152
token: ""
52-
53+
5354
dbmApiCLBDeregister:
5455
api: http://127.0.0.1:80/apis/proxypass/clb_deregister_part_target/
5556
timeout: 10s

dbm-services/common/dbha-v2/internal/analysis/config/config.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,23 +62,24 @@ type DbmApi struct {
6262

6363
// WorkflowConfig workflow's configuration
6464
type WorkflowConfig struct {
65-
WorkerBusinessCount int `yaml:"workerBusinessCount" mapstructure:"workerBusinessCount"`
66-
LockBusinessWaitTimeout time.Duration `yaml:"lockBusinessWaitTimeout" mapstructure:"lockBusinessWaitTimeout"`
67-
ScanTimeout time.Duration `yaml:"scanTimeout" mapstructure:"scanTimeout"`
68-
ScanInterval time.Duration `yaml:"scanInterval" mapstructure:"scanInterval"`
69-
UpdateDbmCacheInterval time.Duration `yaml:"updateDbmCacheInterval" mapstructure:"updateDbmCacheInterval"`
70-
ReadDbMetaOffsetDuration time.Duration `yaml:"readDbMetaOffsetDuration" mapstructure:"readDbMetaOffsetDuration"`
71-
ReadDbMetricOffsetDuration time.Duration `yaml:"readDbMetricOffsetDuration" mapstructure:"readDbMetricOffsetDuration"`
72-
ReadDbEventOffsetDuration time.Duration `yaml:"readDbEventOffsetDuration" mapstructure:"readDbEventOffsetDuration"`
73-
DbmApiMetadata DbmApi `yaml:"dbmApiMetadata" mapstructure:"dbmApiMetadata"`
74-
DbmApiUpdateStatus DbmApi `yaml:"dbmApiUpdateStatus" mapstructure:"dbmApiUpdateStatus"`
75-
DbmApiSwapMysqlRole DbmApi `yaml:"dbmApiSwapMysqlRole" mapstructure:"dbmApiSwapMysqlRole"`
76-
DbmApiSwapTendisCluster DbmApi `yaml:"dbmApiSwapTendisCluster" mapstructure:"dbmApiSwapTendisCluster"`
77-
DbmApiDomainGet DbmApi `yaml:"dbmApiDomainGet" mapstructure:"dbmApiDomainGet"`
78-
DbmApiDomainDelete DbmApi `yaml:"dbmApiDomainDelete" mapstructure:"dbmApiDomainDelete"`
79-
DbmApiCLBDeregister DbmApi `yaml:"dbmApiCLBDeregister" mapstructure:"dbmApiCLBDeregister"`
80-
DbmApiPolarisUnbind DbmApi `yaml:"dbmApiPolarisUnbind" mapstructure:"dbmApiPolarisUnbind"`
81-
DbmApiDumperSwitch DbmApi `yaml:"dbmApiDumperSwitch" mapstructure:"dbmApiDumperSwitch"`
65+
WorkerBusinessCount int `yaml:"workerBusinessCount" mapstructure:"workerBusinessCount"`
66+
LockBusinessWaitTimeout time.Duration `yaml:"lockBusinessWaitTimeout" mapstructure:"lockBusinessWaitTimeout"`
67+
ScanTimeout time.Duration `yaml:"scanTimeout" mapstructure:"scanTimeout"`
68+
ScanInterval time.Duration `yaml:"scanInterval" mapstructure:"scanInterval"`
69+
UpdateDbmCacheInterval time.Duration `yaml:"updateDbmCacheInterval" mapstructure:"updateDbmCacheInterval"`
70+
ReadDbMetaOffsetDuration time.Duration `yaml:"readDbMetaOffsetDuration" mapstructure:"readDbMetaOffsetDuration"`
71+
ReadDbMetricOffsetDuration time.Duration `yaml:"readDbMetricOffsetDuration" mapstructure:"readDbMetricOffsetDuration"`
72+
ReadDbEventOffsetDuration time.Duration `yaml:"readDbEventOffsetDuration" mapstructure:"readDbEventOffsetDuration"`
73+
EnableSwitching bool `yaml:"enableSwitching" mapstructure:"enableSwitching"`
74+
DbmApiMetadata DbmApi `yaml:"dbmApiMetadata" mapstructure:"dbmApiMetadata"`
75+
DbmApiUpdateStatus DbmApi `yaml:"dbmApiUpdateStatus" mapstructure:"dbmApiUpdateStatus"`
76+
DbmApiSwapMysqlRole DbmApi `yaml:"dbmApiSwapMysqlRole" mapstructure:"dbmApiSwapMysqlRole"`
77+
DbmApiSwapTendisCluster DbmApi `yaml:"dbmApiSwapTendisCluster" mapstructure:"dbmApiSwapTendisCluster"`
78+
DbmApiDomainGet DbmApi `yaml:"dbmApiDomainGet" mapstructure:"dbmApiDomainGet"`
79+
DbmApiDomainDelete DbmApi `yaml:"dbmApiDomainDelete" mapstructure:"dbmApiDomainDelete"`
80+
DbmApiCLBDeregister DbmApi `yaml:"dbmApiCLBDeregister" mapstructure:"dbmApiCLBDeregister"`
81+
DbmApiPolarisUnbind DbmApi `yaml:"dbmApiPolarisUnbind" mapstructure:"dbmApiPolarisUnbind"`
82+
DbmApiDumperSwitch DbmApi `yaml:"dbmApiDumperSwitch" mapstructure:"dbmApiDumperSwitch"`
8283
}
8384

8485
// DetectorConfig detector's configuration

dbm-services/common/dbha-v2/internal/analysis/switcher/dbm_client.go renamed to dbm-services/common/dbha-v2/internal/analysis/dbm/client.go

Lines changed: 132 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
* SOFTWARE.
2323
*/
2424

25-
package switcher
25+
package dbm
2626

2727
import (
2828
"context"
@@ -38,30 +38,30 @@ import (
3838
"dbm-services/common/dbha-v2/pkg/storage/hamodel"
3939
)
4040

41-
// DbmClient provides HTTP client for communicating with DBM API services
41+
// Client provides HTTP client for communicating with DBM API services
4242
// Handles database instance management operations including status updates, domain management, and role switching
43-
type DbmClient struct {
44-
httpClient *hanet.HttpClient
43+
type Client struct {
44+
cli *hanet.HttpClient
4545
}
4646

4747
// SendRequest sends HTTP request to DBM API with specified method and timeout
48-
func (dbm *DbmClient) SendRequest(url string, method hanet.HttpMethod, req any,
48+
func (c *Client) SendRequest(url string, method hanet.HttpMethod, req any,
4949
timeout time.Duration) ([]byte, error) {
50-
if dbm.httpClient == nil {
51-
dbm.httpClient = hanet.NewHttpClientWithHeaders(map[string]string{
50+
if c.cli == nil {
51+
c.cli = hanet.NewHttpClientWithHeaders(map[string]string{
5252
"Content-Type": "application/json",
5353
})
5454
}
5555

56-
dbm.httpClient.SetTimeout(timeout)
56+
c.cli.SetTimeout(timeout)
5757

5858
data, err := json.Marshal(&req)
5959
if err != nil {
6060
logger.Warn("failed to marshal the dbm request data, errmsg: %s", err)
6161
return nil, gerrors.NewE(gerrors.InvalidParameter, err)
6262
}
6363

64-
code, resp, err := dbm.httpClient.Request(context.Background(), url, method, data)
64+
code, resp, err := c.cli.Request(context.Background(), url, method, data)
6565
if err != nil {
6666
logger.Warn("failed to send http %s request to dbm, errmsg: %s", method, err)
6767
return nil, err
@@ -74,14 +74,123 @@ func (dbm *DbmClient) SendRequest(url string, method hanet.HttpMethod, req any,
7474
return resp, nil
7575
}
7676

77+
func (c *Client) RequestMetadata(ctx context.Context, req *Request) (*Response, error) {
78+
data, err := json.Marshal(&req)
79+
if err != nil {
80+
return nil, err
81+
}
82+
83+
if c.cli == nil {
84+
c.cli = hanet.NewHttpClientWithHeaders(map[string]string{
85+
"Content-Type": "application/json",
86+
})
87+
}
88+
89+
code, resp, err := c.cli.Post(ctx, config.Cfg.Workflow.DbmApiMetadata.Api, data)
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
if http.StatusOK != code {
95+
return nil, gerrors.Newf(gerrors.HttpRequestFailure, "HTTP responded with a bad code: %d", code)
96+
}
97+
98+
if len(resp) == 0 {
99+
return nil, gerrors.New(gerrors.Failure, "DBM responded with nothing")
100+
}
101+
102+
metaRsp := &Response{}
103+
if err := json.Unmarshal(resp, metaRsp); err != nil {
104+
return nil, gerrors.Newf(gerrors.InvalidJson, "failed to unmarshal metadata response, %s", err)
105+
}
106+
107+
if len(metaRsp.Data) == 0 {
108+
return nil, gerrors.New(gerrors.Failure, "DBM responded with nothing")
109+
}
110+
111+
return metaRsp, nil
112+
}
113+
114+
func (c *Client) QueryMetadataFromDbm(ctx context.Context,
115+
bkCloudId int, ips []string) ([]*hamodel.DbmMetadata, error) {
116+
117+
req := DefaultRequest
118+
req.BkCloudId = bkCloudId
119+
req.Addresses = append(req.Addresses, ips...)
120+
req.DbCloudToken = config.Cfg.Workflow.DbmApiMetadata.Token
121+
122+
metaRsp, err := c.RequestMetadata(ctx, &req)
123+
if err != nil {
124+
return nil, err
125+
}
126+
127+
datas := []*hamodel.DbmMetadata{}
128+
for _, rsp := range metaRsp.Data {
129+
meta := &hamodel.DbmMetadata{
130+
BkIdcCityID: rsp.BkIdcCityID,
131+
BkBizID: rsp.BkBizID,
132+
BkCloudID: rsp.BkCloudID,
133+
LogicalCityID: rsp.LogicalCityID,
134+
LogicalCityName: rsp.LogicalCityName,
135+
Port: rsp.Port,
136+
IP: rsp.IP,
137+
Cluster: rsp.Cluster,
138+
ClusterID: rsp.ClusterID,
139+
ClusterType: rsp.ClusterType,
140+
MachineType: rsp.MachineType,
141+
Status: rsp.Status,
142+
}
143+
144+
if rsp.BindEntry != nil {
145+
bindEndtry, err := json.Marshal(rsp.BindEntry)
146+
if err != nil {
147+
return nil, err
148+
}
149+
150+
meta.BindEntry = string(bindEndtry)
151+
}
152+
153+
if rsp.Receiver != nil {
154+
receiver, err := json.Marshal(rsp.Receiver)
155+
if err != nil {
156+
return nil, err
157+
}
158+
159+
meta.Receiver = string(receiver)
160+
}
161+
162+
if rsp.ProxyInstanceSet != nil {
163+
proxyInsts, err := json.Marshal(rsp.ProxyInstanceSet)
164+
if err != nil {
165+
return nil, err
166+
}
167+
168+
meta.ProxyInstanceSet = string(proxyInsts)
169+
}
170+
171+
if rsp.TBinlogDumpers != nil {
172+
binlogDumpers, err := json.Marshal(rsp.TBinlogDumpers)
173+
if err != nil {
174+
return nil, err
175+
}
176+
177+
meta.BinlogDumperSet = string(binlogDumpers)
178+
}
179+
180+
datas = append(datas, meta)
181+
}
182+
183+
return datas, nil
184+
}
185+
77186
// GetAddressNumberOfDomain retrieves the number of addresses in a specific domain
78-
func (dbm *DbmClient) GetAddressNumberOfDomain(domainName string) (int, error) {
187+
func (c *Client) GetAddressNumberOfDomain(domainName string) (int, error) {
79188
req := DomainGetRequest{
80189
DbCloudToken: config.Cfg.Workflow.DbmApiDomainGet.Token,
81190
DomainName: domainName,
82191
}
83192

84-
resp, err := dbm.SendRequest(config.Cfg.Workflow.DbmApiDomainGet.Api, hanet.HttpMethodPost,
193+
resp, err := c.SendRequest(config.Cfg.Workflow.DbmApiDomainGet.Api, hanet.HttpMethodPost,
85194
req, config.Cfg.Workflow.DbmApiDomainGet.Timeout)
86195
if err != nil {
87196
return 0, err
@@ -96,7 +205,7 @@ func (dbm *DbmClient) GetAddressNumberOfDomain(domainName string) (int, error) {
96205
}
97206

98207
// UpdateInstanceStatus updates the status of a database instance
99-
func (dbm *DbmClient) UpdateInstanceStatus(ip string, port int, status hamodel.DbmMetadataStatus) error {
208+
func (c *Client) UpdateInstanceStatus(ip string, port int, status DbmMetadataStatus) error {
100209
req := UpdateInstanceStatusRequest{
101210
DbCloudToken: config.Cfg.Workflow.DbmApiUpdateStatus.Token,
102211
Payloads: []UpdateInstanceStatusPayload{
@@ -110,7 +219,7 @@ func (dbm *DbmClient) UpdateInstanceStatus(ip string, port int, status hamodel.D
110219

111220
logger.Debug("UpdateInstanceStatus req:%v", req)
112221

113-
response, err := dbm.SendRequest(config.Cfg.Workflow.DbmApiUpdateStatus.Api, hanet.HttpMethodPost,
222+
response, err := c.SendRequest(config.Cfg.Workflow.DbmApiUpdateStatus.Api, hanet.HttpMethodPost,
114223
req, config.Cfg.Workflow.DbmApiUpdateStatus.Timeout)
115224
if err != nil {
116225
logger.Error("failed to update instance (%s:%d) status, errmsg:%s", ip, port, err.Error())
@@ -122,7 +231,7 @@ func (dbm *DbmClient) UpdateInstanceStatus(ip string, port int, status hamodel.D
122231
}
123232

124233
// DeleteFromDomain removes an instance from the specified domain
125-
func (dbm *DbmClient) DeleteFromDomain(domainName string, instance string, app string) error {
234+
func (c *Client) DeleteFromDomain(domainName string, instance string, app string) error {
126235
req := DomainDeleteRequest{
127236
DbCloudToken: config.Cfg.Workflow.DbmApiDomainDelete.Token,
128237
App: app,
@@ -136,7 +245,7 @@ func (dbm *DbmClient) DeleteFromDomain(domainName string, instance string, app s
136245

137246
logger.Debug("DeleteFromDomain req:%v", req)
138247

139-
resp, err := dbm.SendRequest(config.Cfg.Workflow.DbmApiDomainDelete.Api, hanet.HttpMethodDelete,
248+
resp, err := c.SendRequest(config.Cfg.Workflow.DbmApiDomainDelete.Api, hanet.HttpMethodDelete,
140249
req, config.Cfg.Workflow.DbmApiDomainDelete.Timeout)
141250
if err != nil {
142251
return err
@@ -156,7 +265,7 @@ func (dbm *DbmClient) DeleteFromDomain(domainName string, instance string, app s
156265
}
157266

158267
// DeleteFromCLB deregisters an instance from Cloud Load Balancer
159-
func (dbm *DbmClient) DeleteFromCLB(region string, lbid string, lnid string, ins string) error {
268+
func (c *Client) DeleteFromCLB(region string, lbid string, lnid string, ins string) error {
160269
req := ClbDeleteRequest{
161270
DbCloudToken: config.Cfg.Workflow.DbmApiCLBDeregister.Token,
162271
Region: region,
@@ -167,7 +276,7 @@ func (dbm *DbmClient) DeleteFromCLB(region string, lbid string, lnid string, ins
167276

168277
logger.Debug("DeleteFromCLB req: %v", req)
169278

170-
response, err := dbm.SendRequest(config.Cfg.Workflow.DbmApiCLBDeregister.Api, hanet.HttpMethodPost,
279+
response, err := c.SendRequest(config.Cfg.Workflow.DbmApiCLBDeregister.Api, hanet.HttpMethodPost,
171280
req, config.Cfg.Workflow.DbmApiCLBDeregister.Timeout)
172281
if err != nil {
173282
logger.Error("failed to deregister instance (%s) from CLB, errmsg: %s", ins, err.Error())
@@ -179,7 +288,7 @@ func (dbm *DbmClient) DeleteFromCLB(region string, lbid string, lnid string, ins
179288
}
180289

181290
// DeleteFromPolaris unbinds an instance from Polaris service discovery
182-
func (dbm *DbmClient) DeleteFromPolaris(servname string, servtoken string, ins string) error {
291+
func (c *Client) DeleteFromPolaris(servname string, servtoken string, ins string) error {
183292
req := PolarisDeleteRequest{
184293
DbCloudToken: config.Cfg.Workflow.DbmApiPolarisUnbind.Token,
185294
ServiceName: servname,
@@ -189,7 +298,7 @@ func (dbm *DbmClient) DeleteFromPolaris(servname string, servtoken string, ins s
189298

190299
logger.Debug("DeleteFromPolaris req: %v", req)
191300

192-
response, err := dbm.SendRequest(config.Cfg.Workflow.DbmApiPolarisUnbind.Api, hanet.HttpMethodPost,
301+
response, err := c.SendRequest(config.Cfg.Workflow.DbmApiPolarisUnbind.Api, hanet.HttpMethodPost,
193302
req, config.Cfg.Workflow.DbmApiPolarisUnbind.Timeout)
194303
if err != nil {
195304
logger.Error("failed to unbind instance (%s) from Polaris, %s", ins, err.Error())
@@ -201,7 +310,7 @@ func (dbm *DbmClient) DeleteFromPolaris(servname string, servtoken string, ins s
201310
}
202311

203312
// SwapMySQLRole swaps master-slave roles between two MySQL instances
204-
func (dbm *DbmClient) SwapMySQLRole(masterIp string, masterPort int, slaveIp string, slavePort int) error {
313+
func (c *Client) SwapMySQLRole(masterIp string, masterPort int, slaveIp string, slavePort int) error {
205314
payload := SwapMySQLRolePayload{
206315
Instance1: SwapMySQLRoleInstance{
207316
IP: masterIp,
@@ -220,7 +329,7 @@ func (dbm *DbmClient) SwapMySQLRole(masterIp string, masterPort int, slaveIp str
220329

221330
logger.Debug("SwapMySQLRole req: %v", req)
222331

223-
response, err := dbm.SendRequest(config.Cfg.Workflow.DbmApiSwapMysqlRole.Api, hanet.HttpMethodPost,
332+
response, err := c.SendRequest(config.Cfg.Workflow.DbmApiSwapMysqlRole.Api, hanet.HttpMethodPost,
224333
req, config.Cfg.Workflow.DbmApiSwapMysqlRole.Timeout)
225334
if err != nil {
226335
logger.Error("failed to swap role of master(%s:%d) and slave(%s:%d), errmsg: %s",
@@ -233,7 +342,7 @@ func (dbm *DbmClient) SwapMySQLRole(masterIp string, masterPort int, slaveIp str
233342
}
234343

235344
// SwitchBinlogDumper switches binlog dumper configuration for an application
236-
func (dbm *DbmClient) SwitchBinlogDumper(app string, switchInfos []DumperSwitchInfo) error {
345+
func (c *Client) SwitchBinlogDumper(app string, switchInfos []DumperSwitchInfo) error {
237346
req := DumperSwitchRequest{
238347
DbCloudToken: config.Cfg.Workflow.DbmApiDumperSwitch.Token,
239348
IsSafe: true,
@@ -243,7 +352,7 @@ func (dbm *DbmClient) SwitchBinlogDumper(app string, switchInfos []DumperSwitchI
243352

244353
logger.Debug("SwitchBinlogDumper req: %v", req)
245354

246-
response, err := dbm.SendRequest(config.Cfg.Workflow.DbmApiDumperSwitch.Api, hanet.HttpMethodPost,
355+
response, err := c.SendRequest(config.Cfg.Workflow.DbmApiDumperSwitch.Api, hanet.HttpMethodPost,
247356
req, config.Cfg.Workflow.DbmApiDumperSwitch.Timeout)
248357
if err != nil {
249358
logger.Error("failed to switch binlogdumper, errmsg: %s", err.Error())

0 commit comments

Comments
 (0)