Skip to content

Commit 3f2c99b

Browse files
authored
enhance: Add QueryCoord ops commands for advanced cluster management (#411)
Added 9 new QueryCoord operational commands based on milvus ops_services.go: QueryNode Management: - list-querynodes: List all QueryNode instances with status - get-querynode-distribution: Get data distribution for specific nodes - suspend-node/resume-node: Control QueryNode resource operations Load Balancing: - suspend-balance/resume-balance: Control automatic load balancing - check-balance-status: Check current balance status Data Transfer: - transfer-segment: Transfer segments between QueryNodes - transfer-channel: Transfer channels between QueryNodes All commands integrate seamlessly with existing mgrpc architecture and use the established queryCoordState pattern for consistent user experience. 🤖 Generated with [Claude Code](https://claude.ai/code) Signed-off-by: Wei Liu <[email protected]>
1 parent b79f515 commit 3f2c99b

File tree

1 file changed

+287
-0
lines changed

1 file changed

+287
-0
lines changed

states/mgrpc/querycoord.go

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,3 +231,290 @@ func checkerListCmd(clientv2 querypb.QueryCoordClient, id int64) *cobra.Command
231231
}
232232
return cmd
233233
}
234+
235+
// ===== New Ops Commands =====
236+
237+
// QueryNode Management commands
238+
type ListQueryNodeParam struct {
239+
framework.ParamBase `use:"list-querynodes" desc:"List all QueryNode instances"`
240+
}
241+
242+
func (s *queryCoordState) ListQueryNodeCommand(ctx context.Context, p *ListQueryNodeParam) error {
243+
req := &querypb.ListQueryNodeRequest{
244+
Base: &commonpb.MsgBase{
245+
TargetID: s.session.ServerID,
246+
SourceID: -1,
247+
},
248+
}
249+
250+
resp, err := s.client.ListQueryNode(ctx, req)
251+
if err != nil {
252+
return fmt.Errorf("failed to list query nodes: %w", err)
253+
}
254+
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
255+
return fmt.Errorf("list query nodes failed: %s", resp.Status.Reason)
256+
}
257+
258+
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
259+
fmt.Fprintln(w, "Node ID\tAddress\tState")
260+
fmt.Fprintln(w, "---\t---\t---")
261+
for _, node := range resp.NodeInfos {
262+
fmt.Fprintf(w, "%d\t%s\t%s\n", node.ID, node.Address, node.State)
263+
}
264+
w.Flush()
265+
266+
fmt.Printf("\nTotal nodes: %d\n", len(resp.NodeInfos))
267+
return nil
268+
}
269+
270+
type GetQueryNodeDistributionParam struct {
271+
framework.ParamBase `use:"get-querynode-distribution" desc:"Get data distribution for a specific QueryNode"`
272+
NodeID int64 `name:"nodeID" desc:"QueryNode ID to query"`
273+
}
274+
275+
func (s *queryCoordState) GetQueryNodeDistributionCommand(ctx context.Context, p *GetQueryNodeDistributionParam) error {
276+
req := &querypb.GetQueryNodeDistributionRequest{
277+
Base: &commonpb.MsgBase{
278+
TargetID: s.session.ServerID,
279+
SourceID: -1,
280+
},
281+
NodeID: p.NodeID,
282+
}
283+
284+
resp, err := s.client.GetQueryNodeDistribution(ctx, req)
285+
if err != nil {
286+
return fmt.Errorf("failed to get query node distribution: %w", err)
287+
}
288+
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
289+
return fmt.Errorf("get query node distribution failed: %s", resp.Status.Reason)
290+
}
291+
292+
fmt.Printf("QueryNode %d Distribution:\n", p.NodeID)
293+
fmt.Printf("Channels (%d): %v\n", len(resp.ChannelNames), resp.ChannelNames)
294+
fmt.Printf("Sealed Segments (%d): %v\n", len(resp.SealedSegmentIDs), resp.SealedSegmentIDs)
295+
return nil
296+
}
297+
298+
type SuspendNodeParam struct {
299+
framework.ParamBase `use:"suspend-node" desc:"Suspend a QueryNode from resource operations"`
300+
NodeID int64 `name:"nodeID" desc:"QueryNode ID to suspend"`
301+
}
302+
303+
func (s *queryCoordState) SuspendNodeCommand(ctx context.Context, p *SuspendNodeParam) error {
304+
req := &querypb.SuspendNodeRequest{
305+
Base: &commonpb.MsgBase{
306+
TargetID: s.session.ServerID,
307+
SourceID: -1,
308+
},
309+
NodeID: p.NodeID,
310+
}
311+
312+
resp, err := s.client.SuspendNode(ctx, req)
313+
if err != nil {
314+
return fmt.Errorf("failed to suspend node %d: %w", p.NodeID, err)
315+
}
316+
if resp.ErrorCode != commonpb.ErrorCode_Success {
317+
return fmt.Errorf("suspend node %d failed: %s", p.NodeID, resp.Reason)
318+
}
319+
320+
fmt.Printf("⏸️ Node %d suspended successfully\n", p.NodeID)
321+
return nil
322+
}
323+
324+
type ResumeNodeParam struct {
325+
framework.ParamBase `use:"resume-node" desc:"Resume a QueryNode for resource operations"`
326+
NodeID int64 `name:"nodeID" desc:"QueryNode ID to resume"`
327+
}
328+
329+
func (s *queryCoordState) ResumeNodeCommand(ctx context.Context, p *ResumeNodeParam) error {
330+
req := &querypb.ResumeNodeRequest{
331+
Base: &commonpb.MsgBase{
332+
TargetID: s.session.ServerID,
333+
SourceID: -1,
334+
},
335+
NodeID: p.NodeID,
336+
}
337+
338+
resp, err := s.client.ResumeNode(ctx, req)
339+
if err != nil {
340+
return fmt.Errorf("failed to resume node %d: %w", p.NodeID, err)
341+
}
342+
if resp.ErrorCode != commonpb.ErrorCode_Success {
343+
return fmt.Errorf("resume node %d failed: %s", p.NodeID, resp.Reason)
344+
}
345+
346+
fmt.Printf("✅ Node %d resumed successfully\n", p.NodeID)
347+
return nil
348+
}
349+
350+
// Balance Management commands
351+
type SuspendBalanceParam struct {
352+
framework.ParamBase `use:"suspend-balance" desc:"Suspend automatic load balancing"`
353+
}
354+
355+
func (s *queryCoordState) SuspendBalanceCommand(ctx context.Context, p *SuspendBalanceParam) error {
356+
req := &querypb.SuspendBalanceRequest{
357+
Base: &commonpb.MsgBase{
358+
TargetID: s.session.ServerID,
359+
SourceID: -1,
360+
},
361+
}
362+
363+
resp, err := s.client.SuspendBalance(ctx, req)
364+
if err != nil {
365+
return fmt.Errorf("failed to suspend balance: %w", err)
366+
}
367+
if resp.ErrorCode != commonpb.ErrorCode_Success {
368+
return fmt.Errorf("suspend balance failed: %s", resp.Reason)
369+
}
370+
371+
fmt.Printf("⏸️ Balance suspended successfully\n")
372+
return nil
373+
}
374+
375+
type ResumeBalanceParam struct {
376+
framework.ParamBase `use:"resume-balance" desc:"Resume automatic load balancing"`
377+
}
378+
379+
func (s *queryCoordState) ResumeBalanceCommand(ctx context.Context, p *ResumeBalanceParam) error {
380+
req := &querypb.ResumeBalanceRequest{
381+
Base: &commonpb.MsgBase{
382+
TargetID: s.session.ServerID,
383+
SourceID: -1,
384+
},
385+
}
386+
387+
resp, err := s.client.ResumeBalance(ctx, req)
388+
if err != nil {
389+
return fmt.Errorf("failed to resume balance: %w", err)
390+
}
391+
if resp.ErrorCode != commonpb.ErrorCode_Success {
392+
return fmt.Errorf("resume balance failed: %s", resp.Reason)
393+
}
394+
395+
fmt.Printf("✅ Balance resumed successfully\n")
396+
return nil
397+
}
398+
399+
type CheckBalanceStatusParam struct {
400+
framework.ParamBase `use:"check-balance-status" desc:"Check current balance status"`
401+
}
402+
403+
func (s *queryCoordState) CheckBalanceStatusCommand(ctx context.Context, p *CheckBalanceStatusParam) error {
404+
req := &querypb.CheckBalanceStatusRequest{
405+
Base: &commonpb.MsgBase{
406+
TargetID: s.session.ServerID,
407+
SourceID: -1,
408+
},
409+
}
410+
411+
resp, err := s.client.CheckBalanceStatus(ctx, req)
412+
if err != nil {
413+
return fmt.Errorf("failed to check balance status: %w", err)
414+
}
415+
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
416+
return fmt.Errorf("check balance status failed: %s", resp.Status.Reason)
417+
}
418+
419+
status := "Suspended ⏸️"
420+
if resp.IsActive {
421+
status = "Active ✅"
422+
}
423+
fmt.Printf("Balance Status: %s\n", status)
424+
return nil
425+
}
426+
427+
// Data Transfer commands
428+
type TransferSegmentParam struct {
429+
framework.ParamBase `use:"transfer-segment" desc:"Transfer segment(s) between QueryNodes"`
430+
SourceNodeID int64 `name:"sourceNode" desc:"Source QueryNode ID"`
431+
TargetNodeID int64 `name:"targetNode" desc:"Target QueryNode ID (0 for all nodes)"`
432+
SegmentID int64 `name:"segmentID" desc:"Specific segment ID (0 for all segments)"`
433+
TransferAll bool `name:"transferAll" default:"false" desc:"Transfer all segments from source node"`
434+
ToAllNodes bool `name:"toAllNodes" default:"false" desc:"Transfer to all available nodes"`
435+
CopyMode bool `name:"copyMode" default:"false" desc:"Copy mode (keep original)"`
436+
}
437+
438+
func (s *queryCoordState) TransferSegmentCommand(ctx context.Context, p *TransferSegmentParam) error {
439+
req := &querypb.TransferSegmentRequest{
440+
Base: &commonpb.MsgBase{
441+
TargetID: s.session.ServerID,
442+
SourceID: -1,
443+
},
444+
SourceNodeID: p.SourceNodeID,
445+
TargetNodeID: p.TargetNodeID,
446+
SegmentID: p.SegmentID,
447+
TransferAll: p.TransferAll,
448+
ToAllNodes: p.ToAllNodes,
449+
CopyMode: p.CopyMode,
450+
}
451+
452+
resp, err := s.client.TransferSegment(ctx, req)
453+
if err != nil {
454+
return fmt.Errorf("failed to transfer segment: %w", err)
455+
}
456+
if resp.ErrorCode != commonpb.ErrorCode_Success {
457+
return fmt.Errorf("transfer segment failed: %s", resp.Reason)
458+
}
459+
460+
fmt.Printf("✅ Segment transfer initiated successfully\n")
461+
fmt.Printf(" Source Node: %d\n", p.SourceNodeID)
462+
if p.ToAllNodes {
463+
fmt.Printf(" Target: All available nodes\n")
464+
} else if p.TargetNodeID > 0 {
465+
fmt.Printf(" Target Node: %d\n", p.TargetNodeID)
466+
}
467+
if p.TransferAll {
468+
fmt.Printf(" Segments: All segments\n")
469+
} else if p.SegmentID > 0 {
470+
fmt.Printf(" Segment: %d\n", p.SegmentID)
471+
}
472+
return nil
473+
}
474+
475+
type TransferChannelParam struct {
476+
framework.ParamBase `use:"transfer-channel" desc:"Transfer channel(s) between QueryNodes"`
477+
SourceNodeID int64 `name:"sourceNode" desc:"Source QueryNode ID"`
478+
TargetNodeID int64 `name:"targetNode" desc:"Target QueryNode ID (0 for all nodes)"`
479+
ChannelName string `name:"channelName" desc:"Specific channel name (empty for all channels)"`
480+
TransferAll bool `name:"transferAll" default:"false" desc:"Transfer all channels from source node"`
481+
ToAllNodes bool `name:"toAllNodes" default:"false" desc:"Transfer to all available nodes"`
482+
CopyMode bool `name:"copyMode" default:"false" desc:"Copy mode (keep original)"`
483+
}
484+
485+
func (s *queryCoordState) TransferChannelCommand(ctx context.Context, p *TransferChannelParam) error {
486+
req := &querypb.TransferChannelRequest{
487+
Base: &commonpb.MsgBase{
488+
TargetID: s.session.ServerID,
489+
SourceID: -1,
490+
},
491+
SourceNodeID: p.SourceNodeID,
492+
TargetNodeID: p.TargetNodeID,
493+
ChannelName: p.ChannelName,
494+
TransferAll: p.TransferAll,
495+
ToAllNodes: p.ToAllNodes,
496+
CopyMode: p.CopyMode,
497+
}
498+
499+
resp, err := s.client.TransferChannel(ctx, req)
500+
if err != nil {
501+
return fmt.Errorf("failed to transfer channel: %w", err)
502+
}
503+
if resp.ErrorCode != commonpb.ErrorCode_Success {
504+
return fmt.Errorf("transfer channel failed: %s", resp.Reason)
505+
}
506+
507+
fmt.Printf("✅ Channel transfer initiated successfully\n")
508+
fmt.Printf(" Source Node: %d\n", p.SourceNodeID)
509+
if p.ToAllNodes {
510+
fmt.Printf(" Target: All available nodes\n")
511+
} else if p.TargetNodeID > 0 {
512+
fmt.Printf(" Target Node: %d\n", p.TargetNodeID)
513+
}
514+
if p.TransferAll {
515+
fmt.Printf(" Channels: All channels\n")
516+
} else if p.ChannelName != "" {
517+
fmt.Printf(" Channel: %s\n", p.ChannelName)
518+
}
519+
return nil
520+
}

0 commit comments

Comments
 (0)