Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/orchestrator/internal/sandbox/diffcreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ type DiffCreator interface {
}

type RootfsDiffCreator struct {
rootfs rootfs.Provider
stopHook func(context.Context) error
rootfs rootfs.Provider
closeHook func(context.Context) error
}

func (r *RootfsDiffCreator) process(ctx context.Context, out io.Writer) (*header.DiffMetadata, error) {
return r.rootfs.ExportDiff(ctx, out, r.stopHook)
return r.rootfs.ExportDiff(ctx, out, r.closeHook)
}

type MemoryDiffCreator struct {
Expand Down
4 changes: 2 additions & 2 deletions packages/orchestrator/internal/sandbox/rootfs/nbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (o *NBDProvider) Start(ctx context.Context) error {
func (o *NBDProvider) ExportDiff(
parentCtx context.Context,
out io.Writer,
stopSandbox func(ctx context.Context) error,
closeSandbox func(ctx context.Context) error,
) (*header.DiffMetadata, error) {
childCtx, childSpan := o.tracer.Start(parentCtx, "cow-export")
defer childSpan.End()
Expand All @@ -85,7 +85,7 @@ func (o *NBDProvider) ExportDiff(

// the error is already logged in go routine in SandboxCreate handler
go func() {
err := stopSandbox(childCtx)
err := closeSandbox(childCtx)
if err != nil {
zap.L().Error("error stopping sandbox on cow export", zap.Error(err))
}
Expand Down
2 changes: 1 addition & 1 deletion packages/orchestrator/internal/sandbox/rootfs/rootfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ type Provider interface {
Start(ctx context.Context) error
Close(ctx context.Context) error
Path() (string, error)
ExportDiff(ctx context.Context, out io.Writer, stopSandbox func(context.Context) error) (*header.DiffMetadata, error)
ExportDiff(ctx context.Context, out io.Writer, closeSandbox func(context.Context) error) (*header.DiffMetadata, error)
}
101 changes: 57 additions & 44 deletions packages/orchestrator/internal/sandbox/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,9 @@ type RuntimeMetadata struct {
}

type Resources struct {
Slot *network.Slot
rootfs rootfs.Provider
memory uffd.MemoryBackend
uffdExit chan error
Slot *network.Slot
rootfs rootfs.Provider
memory uffd.MemoryBackend
}

type Metadata struct {
Expand All @@ -99,6 +98,8 @@ type Sandbox struct {
Checks *Checks

APIStoredConfig *orchestrator.SandboxConfig

exit *utils.SetOnce[struct{}]
}

func (s *Sandbox) LoggerMetadata() sbxlogger.SandboxMetadata {
Expand Down Expand Up @@ -131,6 +132,8 @@ func CreateSandbox(
childCtx, childSpan := tracer.Start(ctx, "create-sandbox")
defer childSpan.End()

exit := utils.NewSetOnce[struct{}]()

cleanup := NewCleanup()
defer func() {
if e != nil {
Expand Down Expand Up @@ -248,10 +251,9 @@ func CreateSandbox(
telemetry.ReportEvent(childCtx, "created fc process")

resources := &Resources{
Slot: ips.slot,
rootfs: rootfsProvider,
memory: uffd.NewNoopMemory(memfileSize, memfile.BlockSize()),
uffdExit: make(chan error, 1),
Slot: ips.slot,
rootfs: rootfsProvider,
memory: uffd.NewNoopMemory(memfileSize, memfile.BlockSize()),
}

metadata := &Metadata{
Expand All @@ -273,6 +275,8 @@ func CreateSandbox(
cleanup: cleanup,

APIStoredConfig: apiConfigToStore,

exit: exit,
}

checks, err := NewChecks(ctx, tracer, sbx, false)
Expand All @@ -282,9 +286,18 @@ func CreateSandbox(
sbx.Checks = checks

cleanup.AddPriority(func(ctx context.Context) error {
return sbx.Close(ctx, tracer)
// Stop the sandbox first if it is still running, otherwise do nothing
return sbx.Stop(ctx, tracer)
})

go func() {
// If the process exists, stop the sandbox properly
_, fcErr := fcHandle.Exit.Wait()
err := sbx.Stop(context.WithoutCancel(ctx), tracer)

exit.SetResult(struct{}{}, errors.Join(err, fcErr))
}()

return sbx, nil
}

Expand All @@ -307,6 +320,8 @@ func ResumeSandbox(
childCtx, childSpan := tracer.Start(ctx, "resume-sandbox")
defer childSpan.End()

exit := utils.NewSetOnce[struct{}]()

cleanup := NewCleanup()
defer func() {
if e != nil {
Expand Down Expand Up @@ -379,18 +394,16 @@ func ResumeSandbox(
return nil, fmt.Errorf("failed to serve memory: %w", err)
}

// ==== END of resources initialization ====
uffdStartCtx, cancelUffdStartCtx := context.WithCancelCause(ctx)
defer cancelUffdStartCtx(fmt.Errorf("uffd finished starting"))

uffdExit := make(chan error, 1)
go func() {
uffdWaitErr := <-fcUffd.Exit()
uffdExit <- uffdWaitErr
_, uffdWaitErr := fcUffd.Exit().Wait()

cancelUffdStartCtx(fmt.Errorf("uffd process exited: %w", errors.Join(uffdWaitErr, context.Cause(uffdStartCtx))))
}()

// / ==== END of resources initialization ====
rootfsPath, err := rootfsOverlay.Path()
if err != nil {
return nil, fmt.Errorf("failed to get rootfs path: %w", err)
Expand Down Expand Up @@ -452,10 +465,9 @@ func ResumeSandbox(
telemetry.ReportEvent(childCtx, "initialized FC")

resources := &Resources{
Slot: ips.slot,
rootfs: rootfsOverlay,
memory: fcUffd,
uffdExit: uffdExit,
Slot: ips.slot,
rootfs: rootfsOverlay,
memory: fcUffd,
}

metadata := &Metadata{
Expand All @@ -477,6 +489,8 @@ func ResumeSandbox(
cleanup: cleanup,

APIStoredConfig: apiConfigToStore,

exit: exit,
}

// Part of the sandbox as we need to stop Checks before pausing the sandbox
Expand All @@ -489,7 +503,8 @@ func ResumeSandbox(
sbx.Checks = checks

cleanup.AddPriority(func(ctx context.Context) error {
return sbx.Close(ctx, tracer)
// Stop the sandbox first if it is still running, otherwise do nothing
return sbx.Stop(ctx, tracer)
})

err = sbx.WaitForEnvd(
Expand All @@ -503,40 +518,38 @@ func ResumeSandbox(

go sbx.Checks.Start()

return sbx, nil
}
go func() {
// Wait for either uffd or fc process to exit
select {
case <-fcUffd.Exit().Done:
case <-fcHandle.Exit.Done:
}

func (s *Sandbox) Wait(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.process.Exit.Done:
_, fcErr := s.process.Exit.Result()
stopErr := s.Stop(ctx)
uffdErr := <-s.uffdExit
err := sbx.Stop(context.WithoutCancel(ctx), tracer)

return errors.Join(fcErr, stopErr, uffdErr)
case uffdErr := <-s.uffdExit:
stopErr := s.Stop(ctx)
_, uffdWaitErr := fcUffd.Exit().Wait()
_, fcErr := fcHandle.Exit.Wait()
exit.SetResult(struct{}{}, errors.Join(err, fcErr, uffdWaitErr))
}()

_, fcErr := s.process.Exit.WaitWithContext(ctx)
return sbx, nil
}

return errors.Join(uffdErr, stopErr, fcErr)
}
func (s *Sandbox) Wait(ctx context.Context) error {
_, err := s.exit.WaitWithContext(ctx)
return err
}

// Stop starts the cleanup process for the sandbox.
func (s *Sandbox) Stop(ctx context.Context) error {
func (s *Sandbox) Close(ctx context.Context) error {
err := s.cleanup.Run(ctx)
if err != nil {
sbxlogger.I(s).Error("failed to stop sandbox", zap.Error(err))
return fmt.Errorf("failed to stop sandbox: %w", err)
return fmt.Errorf("failed to cleanup sandbox: %w", err)
}

return nil
}

func (s *Sandbox) Close(ctx context.Context, tracer trace.Tracer) error {
// Stop kills the sandbox.
func (s *Sandbox) Stop(ctx context.Context, tracer trace.Tracer) error {
_, span := tracer.Start(ctx, "sandbox-close")
defer span.End()

Expand Down Expand Up @@ -658,8 +671,8 @@ func (s *Sandbox) Pause(
buildID,
originalRootfs.Header(),
&RootfsDiffCreator{
rootfs: s.rootfs,
stopHook: s.Stop,
rootfs: s.rootfs,
closeHook: s.Close,
},
)
if err != nil {
Expand Down Expand Up @@ -894,8 +907,8 @@ func (s *Sandbox) WaitForExit(
return fmt.Errorf("waiting for exit took too long")
case <-ctx.Done():
return nil
case <-s.process.Exit.Done:
_, err := s.process.Exit.Result()
case <-s.exit.Done:
_, err := s.exit.Result()
if err == nil {
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions packages/orchestrator/internal/sandbox/uffd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/uffd/fdexit"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/uffd/mapping"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
"github.com/e2b-dev/infra/packages/shared/pkg/utils"
)

const (
Expand All @@ -25,7 +26,7 @@ const (
)

type Uffd struct {
exitCh chan error
exit *utils.SetOnce[struct{}]
readyCh chan struct{}

fdExit *fdexit.FdExit
Expand All @@ -48,7 +49,7 @@ func New(memfile block.ReadonlyDevice, socketPath string, blockSize int64) (*Uff
}

return &Uffd{
exitCh: make(chan error, 1),
exit: utils.NewSetOnce[struct{}](),
readyCh: make(chan struct{}, 1),
fdExit: fdExit,
memfile: trackedMemfile,
Expand All @@ -75,10 +76,9 @@ func (u *Uffd) Start(sandboxId string) error {
closeErr := u.lis.Close()
fdExitErr := u.fdExit.Close()

u.exitCh <- errors.Join(handleErr, closeErr, fdExitErr)
u.exit.SetResult(struct{}{}, errors.Join(handleErr, closeErr, fdExitErr))

close(u.readyCh)
close(u.exitCh)
}()

return nil
Expand Down Expand Up @@ -165,8 +165,8 @@ func (u *Uffd) Ready() chan struct{} {
return u.readyCh
}

func (u *Uffd) Exit() chan error {
return u.exitCh
func (u *Uffd) Exit() *utils.SetOnce[struct{}] {
return u.exit
}

func (u *Uffd) TrackAndReturnNil() error {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package uffd

import "github.com/bits-and-blooms/bitset"
import (
"github.com/bits-and-blooms/bitset"

"github.com/e2b-dev/infra/packages/shared/pkg/utils"
)

type MemoryBackend interface {
Disable() error
Expand All @@ -9,5 +13,5 @@ type MemoryBackend interface {
Start(sandboxId string) error
Stop() error
Ready() chan struct{}
Exit() chan error
Exit() *utils.SetOnce[struct{}]
}
10 changes: 7 additions & 3 deletions packages/orchestrator/internal/sandbox/uffd/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"github.com/bits-and-blooms/bitset"

"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
"github.com/e2b-dev/infra/packages/shared/pkg/utils"
)

type NoopMemory struct {
size int64
blockSize int64

dirty *bitset.BitSet

exit *utils.SetOnce[struct{}]
}

func NewNoopMemory(size, blockSize int64) *NoopMemory {
Expand All @@ -23,6 +26,7 @@ func NewNoopMemory(size, blockSize int64) *NoopMemory {
size: size,
blockSize: blockSize,
dirty: dirty,
exit: utils.NewSetOnce[struct{}](),
}
}

Expand All @@ -39,7 +43,7 @@ func (m *NoopMemory) Start(sandboxId string) error {
}

func (m *NoopMemory) Stop() error {
return nil
return m.exit.SetValue(struct{}{})
}

func (m *NoopMemory) Ready() chan struct{} {
Expand All @@ -48,6 +52,6 @@ func (m *NoopMemory) Ready() chan struct{} {
return ch
}

func (m *NoopMemory) Exit() chan error {
return make(chan error)
func (m *NoopMemory) Exit() *utils.SetOnce[struct{}] {
return m.exit
}
6 changes: 3 additions & 3 deletions packages/orchestrator/internal/server/sandboxes.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (s *server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ
sbxlogger.I(sbx).Error("failed to wait for sandbox, cleaning up", zap.Error(waitErr))
}

cleanupErr := sbx.Stop(ctx)
cleanupErr := sbx.Close(ctx)
if cleanupErr != nil {
sbxlogger.I(sbx).Error("failed to cleanup sandbox, will remove from cache", zap.Error(cleanupErr))
}
Expand Down Expand Up @@ -304,7 +304,7 @@ func (s *server) Delete(ctxConn context.Context, in *orchestrator.SandboxDeleteR
// Start the cleanup in a goroutine—the initial kill request should be send as the first thing in stop, and at this point you cannot route to the sandbox anymore.
// We don't wait for the whole cleanup to finish here.
go func() {
err := sbx.Stop(ctx)
err := sbx.Close(ctx)
if err != nil {
sbxlogger.I(sbx).Error("error stopping sandbox", logger.WithSandboxID(in.SandboxId), zap.Error(err))
}
Expand Down Expand Up @@ -372,7 +372,7 @@ func (s *server) Pause(ctx context.Context, in *orchestrator.SandboxPauseRequest
ctx, childSpan := s.tracer.Start(ctx, "sandbox-pause-stop")
defer childSpan.End()

err := sbx.Stop(ctx)
err := sbx.Close(ctx)
if err != nil {
sbxlogger.I(sbx).Error("error stopping sandbox after snapshot", logger.WithSandboxID(in.SandboxId), zap.Error(err))
}
Expand Down
Loading
Loading