Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/client-proxy/internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func catalogResolution(ctx context.Context, sandboxId string, c catalog.Sandboxe
func NewClientProxy(meterProvider metric.MeterProvider, serviceName string, port uint16, catalog catalog.SandboxesCatalog) (*reverseproxy.Proxy, error) {
proxy := reverseproxy.New(
port,
// Retries that are needed to handle port forwarding delays in sandbox envd are handled by the orchestrator proxy
reverseproxy.ClientProxyRetries,
idleTimeout,
func(r *http.Request) (*pool.Destination, error) {
sandboxId, port, err := reverseproxy.ParseHost(r.Host)
Expand Down
2 changes: 2 additions & 0 deletions packages/orchestrator/internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type SandboxProxy struct {
func NewSandboxProxy(meterProvider metric.MeterProvider, port uint16, sandboxes *sandbox.Map) (*SandboxProxy, error) {
proxy := reverseproxy.New(
port,
// Retry 5 times to handle port forwarding delays in sandbox envd.
reverseproxy.SandboxProxyRetries,
idleTimeout,
func(r *http.Request) (*pool.Destination, error) {
sandboxId, port, err := reverseproxy.ParseHost(r.Host)
Expand Down
44 changes: 35 additions & 9 deletions packages/shared/pkg/proxy/pool/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type proxyClient struct {
func newProxyClient(
maxIdleConns,
maxHostIdleConns int,
maxConnectionAttempts int,
idleTimeout time.Duration,
totalConnsCounter *atomic.Uint64,
currentConnsCounter *atomic.Int64,
Expand All @@ -39,17 +40,42 @@ func newProxyClient(
ResponseHeaderTimeout: 0,
// TCP configuration
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := (&net.Dialer{
Timeout: 30 * time.Second, // Connect timeout (no timeout by default)
KeepAlive: 20 * time.Second, // Lower than our http keepalives (50 seconds)
}).DialContext(ctx, network, addr)
if err != nil {
return nil, err
}
var conn net.Conn
var err error

// Retry connection attempts to handle port forwarding delays in sandbox envd.
// When a process binds to localhost inside the sandbox, it can take up to 1s (delay is 1s + socat startup delay)
// for the port scanner to detect it and start socat forwarding to the host IP.
maxAttempts := max(maxConnectionAttempts, 1)
for attempt := range maxAttempts {
conn, err = (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 20 * time.Second,
}).DialContext(ctx, network, addr)

if err == nil {
totalConnsCounter.Add(1)
return tracking.NewConnection(conn, currentConnsCounter), nil
}

if ctx.Err() != nil {
return nil, ctx.Err()
}

totalConnsCounter.Add(1)
// Don't sleep on the last attempt
if attempt < maxAttempts-1 {
// Linear backoff: 100ms, 200ms, 300ms, 400ms
backoff := time.Duration(100*(attempt+1)) * time.Millisecond
select {
case <-time.After(backoff):
// Continue to next attempt
case <-ctx.Done():
return nil, ctx.Err()
}
}
}

return tracking.NewConnection(conn, currentConnsCounter), nil
return nil, err
},
DisableCompression: true, // No need to request or manipulate compression
}
Expand Down
21 changes: 12 additions & 9 deletions packages/shared/pkg/proxy/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ import (
const hostConnectionSplit = 4

type ProxyPool struct {
pool *smap.Map[*proxyClient]
maxClientConns int
idleTimeout time.Duration
totalConnsCounter atomic.Uint64
currentConnsCounter atomic.Int64
pool *smap.Map[*proxyClient]
maxClientConns int
maxConnectionAttempts int
idleTimeout time.Duration
totalConnsCounter atomic.Uint64
currentConnsCounter atomic.Int64
}

func New(maxClientConns int, idleTimeout time.Duration) *ProxyPool {
func New(maxClientConns int, maxConnectionAttempts int, idleTimeout time.Duration) *ProxyPool {
return &ProxyPool{
pool: smap.New[*proxyClient](),
maxClientConns: maxClientConns,
idleTimeout: idleTimeout,
pool: smap.New[*proxyClient](),
maxClientConns: maxClientConns,
maxConnectionAttempts: maxConnectionAttempts,
idleTimeout: idleTimeout,
}
}

Expand Down Expand Up @@ -58,6 +60,7 @@ func (p *ProxyPool) Get(d *Destination) *proxyClient {

return p.maxClientConns / hostConnectionSplit
}(),
p.maxConnectionAttempts,
p.idleTimeout,
&p.totalConnsCounter,
&p.currentConnsCounter,
Expand Down
9 changes: 9 additions & 0 deletions packages/shared/pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,22 @@ type Proxy struct {
currentServerConnsCounter atomic.Int64
}

type MaxConnectionAttempts int

const (
ClientProxyRetries = 1
SandboxProxyRetries = 5
)

func New(
port uint16,
maxConnectionAttempts MaxConnectionAttempts,
idleTimeout time.Duration,
getDestination func(r *http.Request) (*pool.Destination, error),
) *Proxy {
p := pool.New(
maxClientConns,
int(maxConnectionAttempts),
idleTimeout,
)

Expand Down
Loading
Loading