Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
terraform 1.5.7
890 changes: 890 additions & 0 deletions go.work.sum

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions packages/cluster/filestore/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
terraform {
required_providers {
google = {
source = "hashicorp/google"
version = "6.13.0"
}
}
}
resource "google_filestore_instance" "" {
name = ""
tier = ""
}
3 changes: 3 additions & 0 deletions packages/cluster/filestore/variables.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
variable "filestore_name" {
type = string
}
6 changes: 6 additions & 0 deletions packages/cluster/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,9 @@ module "network" {
labels = var.labels
prefix = var.prefix
}

module "filestore" {
source = "./filestore"

filestore_name = "${var.prefix}cache"
}
3 changes: 2 additions & 1 deletion packages/orchestrator/cmd/build-template/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/e2b-dev/infra/packages/orchestrator/internal/proxy"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/network"
sbxtemplate "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/template"
Expand Down Expand Up @@ -97,7 +98,7 @@ func buildTemplate(
}
}()

persistenceTemplate, err := storage.GetTemplateStorageProvider(ctx, nil)
persistenceTemplate, err := storage.GetTemplateStorageProvider(ctx, nil, block.ChunkSize)
if err != nil {
return fmt.Errorf("could not create storage provider: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion packages/orchestrator/cmd/inspect-data/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log"

"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block"
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
)

Expand Down Expand Up @@ -42,7 +43,7 @@ func main() {

ctx := context.Background()

storage, err := storage.GetTemplateStorageProvider(ctx, nil)
storage, err := storage.GetTemplateStorageProvider(ctx, nil, block.ChunkSize)
if err != nil {
log.Fatalf("failed to get storage provider: %s", err)
}
Expand Down
3 changes: 2 additions & 1 deletion packages/orchestrator/cmd/inspect-header/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"unsafe"

"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block"
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
)
Expand All @@ -33,7 +34,7 @@ func main() {
}

ctx := context.Background()
storage, err := storage.GetTemplateStorageProvider(ctx, nil)
storage, err := storage.GetTemplateStorageProvider(ctx, nil, block.ChunkSize)
if err != nil {
log.Fatalf("failed to get storage provider: %s", err)
}
Expand Down
3 changes: 2 additions & 1 deletion packages/orchestrator/cmd/simulate-headers-merge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/google/uuid"

"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block"
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
)
Expand Down Expand Up @@ -45,7 +46,7 @@ func main() {

ctx := context.Background()

storage, err := storage.GetTemplateStorageProvider(ctx, nil)
storage, err := storage.GetTemplateStorageProvider(ctx, nil, block.ChunkSize)
if err != nil {
log.Fatalf("failed to get storage provider: %s", err)
}
Expand Down
3 changes: 2 additions & 1 deletion packages/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/e2b-dev/infra/packages/orchestrator/internal/metrics"
"github.com/e2b-dev/infra/packages/orchestrator/internal/proxy"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/network"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/template"
Expand Down Expand Up @@ -248,7 +249,7 @@ func run(port, proxyPort uint) (success bool) {
zap.L().Fatal("failed to create limiter", zap.Error(err))
}

persistence, err := storage.GetTemplateStorageProvider(ctx, limiter)
persistence, err := storage.GetTemplateStorageProvider(ctx, limiter, block.ChunkSize)
if err != nil {
zap.L().Fatal("failed to create template storage provider", zap.Error(err))
}
Expand Down
200 changes: 200 additions & 0 deletions packages/shared/pkg/storage/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package storage

import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type CachedProvider struct {
rootPath string
chunkSize int
inner StorageProvider
}

func NewCachedProvider(rootPath string, chunksize int, inner StorageProvider) *CachedProvider {
return &CachedProvider{rootPath: rootPath, inner: inner, chunkSize: chunksize}
}

func (c CachedProvider) DeleteObjectsWithPrefix(ctx context.Context, prefix string) error {
go func(ctx context.Context) {
c.deleteObjectsWithPrefix(ctx, prefix)
}(context.WithoutCancel(ctx))

return c.inner.DeleteObjectsWithPrefix(ctx, prefix)
}

func (c CachedProvider) UploadSignedURL(ctx context.Context, path string, ttl time.Duration) (string, error) {
return c.inner.UploadSignedURL(ctx, path, ttl)
}

func (c CachedProvider) OpenObject(ctx context.Context, path string) (StorageObjectProvider, error) {
innerObject, err := c.inner.OpenObject(ctx, path)
if err != nil {
return nil, err
}

localPath := filepath.Join(c.rootPath, path)
return &CachedFileObjectProvider{path: localPath, chunkSize: c.chunkSize, inner: innerObject}, nil
}

func (c CachedProvider) GetDetails() string {
return fmt.Sprintf("[Cachine file storage, base path set to %s, which wraps %s]",
c.rootPath, c.inner.GetDetails())
}

func (c CachedProvider) deleteObjectsWithPrefix(ctx context.Context, prefix string) {
panic("implement me")
}

var _ StorageProvider = (*CachedProvider)(nil)

type CachedFileObjectProvider struct {
path string
chunkSize int
inner StorageObjectProvider
}

var _ StorageObjectProvider = (*CachedFileObjectProvider)(nil)

func (c *CachedFileObjectProvider) makeChunkFilename(offset int) string {
return fmt.Sprintf("%s/%012d-%d.bin", c.path, offset, c.chunkSize)
}

func (c *CachedFileObjectProvider) WriteTo(dst io.Writer) (int64, error) {
// try to open the cached file
local, err := os.Open(c.path)
if err == nil {
// the cached file exists! write it to the destination
defer func() {
if err := local.Close(); err != nil {
zap.L().Error("error on closing file", zap.Error(err))
}
}()
return io.Copy(dst, local)
}

// the local file does not exist, let's write it while we write the remote
if os.IsNotExist(err) {
dst = io.MultiWriter(local, dst)
}

return c.inner.WriteTo(dst)
}

func (c *CachedFileObjectProvider) WriteFromFileSystem(path string) error {
// write the file to the disk and the remote system at the same time.
// this opens the file twice, but the API makes it difficult to use a MultiWriter

var eg errgroup.Group

eg.Go(func() error {
if err := c.writeFromFile(path); err != nil {
zap.L().Error("error on cache write", zap.String("path", c.path), zap.Error(err))
}
return nil
})

eg.Go(func() error {
return c.inner.WriteFromFileSystem(path)
})

return eg.Wait()
}

func (c *CachedFileObjectProvider) ReadFrom(src io.Reader) (int64, error) {
// we have to write local, then read the local to the remote,
// as the io.Reader can only be read once. this lets us "start over"

if err := c.writeToLocal(src); err != nil {
return 0, err
}

if err := c.inner.WriteFromFileSystem(c.path); err != nil {
return 0, err
}

return 0, nil
}

func (c *CachedFileObjectProvider) ReadAt(buff []byte, off int64) (n int, err error) {
// try to read from local cache first

fp, err := os.Open(c.path)
if err == nil {
return fp.ReadAt(buff, off)
}

// todo: cache chunk
return c.inner.ReadAt(buff, off)
}

func (c *CachedFileObjectProvider) Size() (int64, error) {
stat, err := os.Stat(c.path)
if err == nil {
return stat.Size(), nil
}

zap.L().Error("error on cache size read", zap.String("path", c.path), zap.Error(err))
return c.inner.Size()
}

func (c *CachedFileObjectProvider) Delete() error {
go func() {
if err := os.Remove(c.path); ignoreFileMissingError(err) != nil {
zap.L().Error("error on cache delete", zap.String("path", c.path), zap.Error(err))
}
}()

return c.inner.Delete()
}

func (c *CachedFileObjectProvider) writeToLocal(src io.Reader) error {
dst, err := os.OpenFile(c.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o666)
if err != nil {
return fmt.Errorf("error on open cache file: %w", err)
}
defer func() {
if err := dst.Close(); err != nil {
zap.L().Error("error on closing local file", zap.Error(err))
}
}()

if _, err := io.Copy(dst, src); err != nil {
return fmt.Errorf("error on writing cache file: %w", err)
}

return nil
}

func (c *CachedFileObjectProvider) writeFromFile(path string) error {
fp, err := os.Open(path)
if err != nil {
return fmt.Errorf("error on open cache file: %w", err)
}
defer func() {
if err := fp.Close(); err != nil {
zap.L().Error("error on closing file", zap.Error(err))
}
}()

if err = c.writeToLocal(fp); err != nil {
return fmt.Errorf("error on writing cache file: %w", err)
}

return nil
}

func ignoreFileMissingError(err error) error {
if os.IsNotExist(err) {
return nil
}

return err
}
13 changes: 13 additions & 0 deletions packages/shared/pkg/storage/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package storage

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestCachedFileObjectProvider_MakeChunkFilename(t *testing.T) {
c := CachedFileObjectProvider{path: "/a/b/c", chunkSize: 1024}
filename := c.makeChunkFilename(4192)
assert.Equal(t, "/a/b/c/000000004192-1024.bin", filename)
}
15 changes: 14 additions & 1 deletion packages/shared/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,20 @@ type StorageObjectProvider interface {
Delete() error
}

func GetTemplateStorageProvider(ctx context.Context, limiter *limit.Limiter) (StorageProvider, error) {
func GetTemplateStorageProvider(ctx context.Context, limiter *limit.Limiter, chunkSize int) (StorageProvider, error) {
provider, err := getTemplateStorageProvider(ctx, limiter)
if err != nil {
return nil, err
}

if path := env.GetEnv("LOCAL_TEMPLATE_CACHE_PATH", ""); path != "" {
provider = NewCachedProvider(path, chunkSize, provider)
}

return provider, nil
}

func getTemplateStorageProvider(ctx context.Context, limiter *limit.Limiter) (StorageProvider, error) {
provider := Provider(env.GetEnv(storageProviderEnv, string(DefaultStorageProvider)))

if provider == LocalStorageProvider {
Expand Down
4 changes: 4 additions & 0 deletions packages/shared/pkg/storage/storage_google.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type GCPBucketStorageProvider struct {
limiter *limit.Limiter
}

var _ StorageProvider = (*GCPBucketStorageProvider)(nil)

type GCPBucketStorageObjectProvider struct {
storage *GCPBucketStorageProvider
path string
Expand All @@ -48,6 +50,8 @@ type GCPBucketStorageObjectProvider struct {
limiter *limit.Limiter
}

var _ StorageObjectProvider = (*GCPBucketStorageObjectProvider)(nil)

func NewGCPBucketStorageProvider(ctx context.Context, bucketName string, limiter *limit.Limiter) (*GCPBucketStorageProvider, error) {
client, err := storage.NewClient(ctx)
if err != nil {
Expand Down
Loading