Skip to content

Commit dfe87b9

Browse files
authored
Don't assume an orchestrator port number, but pull it from nomad (#1368)
1 parent 8f2c2de commit dfe87b9

File tree

7 files changed

+429
-17
lines changed

7 files changed

+429
-17
lines changed

.editorconfig

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[*.hcl]
2+
indent_size = 2

iac/provider-gcp/nomad/jobs/orchestrator.hcl

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,39 @@ job "orchestrator-${latest_orchestrator_job_id}" {
55
priority = 90
66

77
group "client-orchestrator" {
8+
network {
9+
mode = "host"
10+
11+
port "grpc" {
12+
// todo: remove this once all API and client-proxy jobs
13+
// can pull the port number from nomad.
14+
static = "${port}"
15+
}
16+
17+
port "proxy" {
18+
// todo: remove this once all API and client-proxy jobs
19+
// can pull the port number from nomad.
20+
static = "${proxy_port}"
21+
}
22+
}
23+
824
service {
925
name = "orchestrator"
10-
port = "${port}"
26+
port = "grpc"
1127

1228
check {
1329
type = "grpc"
1430
name = "health"
1531
interval = "20s"
1632
timeout = "5s"
1733
grpc_use_tls = false
18-
port = "${port}"
34+
port = "grpc"
1935
}
2036
}
2137

2238
service {
2339
name = "orchestrator-proxy"
24-
port = "${proxy_port}"
40+
port = "proxy"
2541
}
2642

2743
task "check-placement" {
@@ -74,8 +90,8 @@ EOT
7490
CLICKHOUSE_CONNECTION_STRING = "${clickhouse_connection_string}"
7591
REDIS_URL = "${redis_url}"
7692
REDIS_CLUSTER_URL = "${redis_cluster_url}"
77-
GRPC_PORT = "${port}"
78-
PROXY_PORT = "${proxy_port}"
93+
GRPC_PORT = "$${NOMAD_PORT_grpc}"
94+
PROXY_PORT = "$${NOMAD_PORT_proxy}"
7995
GIN_MODE = "release"
8096

8197
%{ if launch_darkly_api_key != "" }

packages/api/internal/cfg/model.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ type Config struct {
1616
NomadAddress string `env:"NOMAD_ADDRESS" envDefault:"http://localhost:4646"`
1717
NomadToken string `env:"NOMAD_TOKEN"`
1818

19+
// DefaultOrchestratorPort is the port that the Orchestrator listens on.
20+
// Deprecated: Nomad knows which port the orchestrator is listening on. Keep this
21+
// around temporarily until all nomad jobs have a port labeled "grpc", then this can be removed.
22+
DefaultOrchestratorPort int `env:"ORCHESTRATOR_PORT" envDefault:"5008"`
23+
24+
OrchestratorPortLabel string `env:"ORCHESTRATOR_PORT_LABEL" envDefault:"grpc"`
25+
1926
PostgresConnectionString string `env:"POSTGRES_CONNECTION_STRING,required,notEmpty"`
2027

2128
PosthogAPIKey string `env:"POSTHOG_API_KEY"`

packages/api/internal/orchestrator/client.go

Lines changed: 103 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,26 +81,121 @@ func (o *Orchestrator) listNomadNodes(ctx context.Context) ([]nodemanager.NomadS
8181
defer listSpan.End()
8282

8383
options := &nomadapi.QueryOptions{
84-
// TODO: Use variable for node pool name ("default")
85-
Filter: "Status == \"ready\" and NodePool == \"default\"",
84+
Filter: `ClientStatus == "running" and JobID contains "orchestrator-"`,
85+
Params: map[string]string{"resources": "true"},
8686
}
87-
nomadNodes, _, err := o.nomadClient.Nodes().List(options.WithContext(ctx))
87+
nomadAllocations, _, err := o.nomadClient.Allocations().List(options.WithContext(ctx))
8888
if err != nil {
8989
return nil, err
9090
}
9191

92-
result := make([]nodemanager.NomadServiceDiscovery, 0, len(nomadNodes))
93-
for _, n := range nomadNodes {
92+
result := make([]nodemanager.NomadServiceDiscovery, 0, len(nomadAllocations))
93+
for _, alloc := range nomadAllocations {
94+
if !isHealthy(alloc) {
95+
zap.L().Debug("Skipping unhealthy allocation", zap.String("allocation_id", alloc.ID))
96+
97+
continue
98+
}
99+
100+
ip, port, ok := o.findPortInAllocation(alloc)
101+
if !ok {
102+
zap.L().Warn("Cannot find port in allocation",
103+
zap.String("allocation_id", alloc.ID), zap.String("port_name", "grpc"))
104+
105+
continue
106+
}
107+
108+
zap.L().Debug("Found port in allocation",
109+
zap.String("allocation_id", alloc.ID),
110+
zap.String("port_name", "grpc"),
111+
zap.String("ip", ip),
112+
zap.Int("port", port),
113+
)
114+
94115
result = append(result, nodemanager.NomadServiceDiscovery{
95-
NomadNodeShortID: n.ID[:consts.NodeIDLength],
96-
OrchestratorAddress: fmt.Sprintf("%s:%s", n.Address, consts.OrchestratorPort),
97-
IPAddress: n.Address,
116+
NomadNodeShortID: alloc.NodeID[:consts.NodeIDLength],
117+
OrchestratorAddress: fmt.Sprintf("%s:%d", ip, port),
118+
IPAddress: ip,
98119
})
99120
}
100121

101122
return result, nil
102123
}
103124

125+
func isHealthy(alloc *nomadapi.AllocationListStub) bool {
126+
if alloc == nil {
127+
zap.L().Warn("Allocation is nil")
128+
129+
return false
130+
}
131+
132+
if alloc.DeploymentStatus == nil {
133+
zap.L().Warn("Allocation deployment status is nil", zap.String("allocation_id", alloc.ID))
134+
135+
return false
136+
}
137+
138+
if alloc.DeploymentStatus.Healthy == nil {
139+
zap.L().Warn("Allocation deployment status healthy is nil", zap.String("allocation_id", alloc.ID))
140+
141+
return false
142+
}
143+
144+
return *alloc.DeploymentStatus.Healthy
145+
}
146+
147+
func (o *Orchestrator) findPortInAllocation(allocation *nomadapi.AllocationListStub) (string, int, bool) {
148+
if allocation == nil {
149+
return "", 0, false
150+
}
151+
152+
if allocation.AllocatedResources == nil {
153+
return "", 0, false
154+
}
155+
156+
for _, task := range allocation.AllocatedResources.Tasks {
157+
for _, network := range task.Networks {
158+
host, port, ok := o.findPortInNetwork(network)
159+
if ok {
160+
return host, port, true
161+
}
162+
}
163+
}
164+
165+
for _, net := range allocation.AllocatedResources.Shared.Networks {
166+
host, port, ok := o.findPortInNetwork(net)
167+
if ok {
168+
return host, port, true
169+
}
170+
}
171+
172+
return "", 0, false
173+
}
174+
175+
func (o *Orchestrator) findPortInNetwork(net *nomadapi.NetworkResource) (string, int, bool) {
176+
for _, port := range net.ReservedPorts {
177+
if port.Label == o.portLabel {
178+
return net.IP, port.Value, true
179+
}
180+
181+
if port.Value == o.defaultPort {
182+
return net.IP, o.defaultPort, true
183+
}
184+
}
185+
186+
for _, port := range net.DynamicPorts {
187+
if port.Label == o.portLabel {
188+
return net.IP, port.Value, true
189+
}
190+
191+
if port.Value == o.defaultPort {
192+
return net.IP, o.defaultPort, true
193+
}
194+
}
195+
196+
return "", 0, false
197+
}
198+
104199
func (o *Orchestrator) GetNode(clusterID uuid.UUID, nodeID string) *nodemanager.Node {
105200
scopedKey := o.scopedNodeID(clusterID, nodeID)
106201
n, _ := o.nodes.Get(scopedKey)

0 commit comments

Comments
 (0)