Skip to content

Commit 56f4a47

Browse files
wswsmaoilyee
andcommitted
Add fuse-manager
Signed-off-by: abushwang <[email protected]> Co-authored-by: Zuti He <[email protected]>
1 parent ff392c1 commit 56f4a47

File tree

16 files changed

+1829
-120
lines changed

16 files changed

+1829
-120
lines changed

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ REVISION=$(shell git rev-parse HEAD)$(shell if ! git diff --no-ext-diff --quiet
2424
GO_BUILD_LDFLAGS ?= -s -w
2525
GO_LD_FLAGS=-ldflags '$(GO_BUILD_LDFLAGS) -X $(PKG)/version.Version=$(VERSION) -X $(PKG)/version.Revision=$(REVISION) $(GO_EXTRA_LDFLAGS)'
2626

27-
CMD=containerd-stargz-grpc ctr-remote stargz-store
27+
CMD=containerd-stargz-grpc ctr-remote stargz-store stargz-fuse-manager
2828

2929
CMD_BINARIES=$(addprefix $(PREFIX),$(CMD))
3030

@@ -48,6 +48,9 @@ stargz-store: FORCE
4848
stargz-store-helper: FORCE
4949
cd cmd/ ; GO111MODULE=$(GO111MODULE_VALUE) go build -o $(PREFIX)$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) -v ./stargz-store/helper
5050

51+
stargz-fuse-manager: FORCE
52+
cd cmd/ ; GO111MODULE=$(GO111MODULE_VALUE) go build -o $(PREFIX)$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) -v ./stargz-fuse-manager
53+
5154
check:
5255
@echo "$@"
5356
@GO111MODULE=$(GO111MODULE_VALUE) $(shell go env GOPATH)/bin/golangci-lint run
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package fsopts
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"io"
23+
"path/filepath"
24+
25+
"github.com/containerd/log"
26+
dbmetadata "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/db"
27+
ipfs "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/ipfs"
28+
"github.com/containerd/stargz-snapshotter/fs"
29+
"github.com/containerd/stargz-snapshotter/metadata"
30+
memorymetadata "github.com/containerd/stargz-snapshotter/metadata/memory"
31+
bolt "go.etcd.io/bbolt"
32+
)
33+
34+
type Config struct {
35+
EnableIpfs bool
36+
MetadataStore string
37+
}
38+
39+
const (
40+
memoryMetadataType = "memory"
41+
dbMetadataType = "db"
42+
)
43+
44+
func ConfigFsOpts(ctx context.Context, rootDir string, config *Config) ([]fs.Option, error) {
45+
fsOpts := []fs.Option{fs.WithMetricsLogLevel(log.InfoLevel)}
46+
47+
if config.EnableIpfs {
48+
fsOpts = append(fsOpts, fs.WithResolveHandler("ipfs", new(ipfs.ResolveHandler)))
49+
}
50+
51+
mt, err := getMetadataStore(rootDir, config)
52+
if err != nil {
53+
log.G(ctx).WithError(err).Fatalf("failed to configure metadata store")
54+
}
55+
fsOpts = append(fsOpts, fs.WithMetadataStore(mt))
56+
57+
return fsOpts, nil
58+
}
59+
60+
func getMetadataStore(rootDir string, config *Config) (metadata.Store, error) {
61+
switch config.MetadataStore {
62+
case "", memoryMetadataType:
63+
return memorymetadata.NewReader, nil
64+
case dbMetadataType:
65+
bOpts := bolt.Options{
66+
NoFreelistSync: true,
67+
InitialMmapSize: 64 * 1024 * 1024,
68+
FreelistType: bolt.FreelistMapType,
69+
}
70+
db, err := bolt.Open(filepath.Join(rootDir, "metadata.db"), 0600, &bOpts)
71+
if err != nil {
72+
return nil, err
73+
}
74+
return func(sr *io.SectionReader, opts ...metadata.Option) (metadata.Reader, error) {
75+
return dbmetadata.NewReader(db, sr, opts...)
76+
}, nil
77+
default:
78+
return nil, fmt.Errorf("unknown metadata store type: %v; must be %v or %v",
79+
config.MetadataStore, memoryMetadataType, dbMetadataType)
80+
}
81+
}

cmd/containerd-stargz-grpc/main.go

Lines changed: 86 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -20,43 +20,32 @@ import (
2020
"context"
2121
"flag"
2222
"fmt"
23-
"io"
2423
golog "log"
2524
"math/rand"
2625
"net"
2726
"net/http"
2827
"os"
28+
"os/exec"
2929
"os/signal"
3030
"path/filepath"
3131
"time"
3232

3333
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
3434
"github.com/containerd/containerd/v2/contrib/snapshotservice"
3535
"github.com/containerd/containerd/v2/core/snapshots"
36-
"github.com/containerd/containerd/v2/defaults"
37-
"github.com/containerd/containerd/v2/pkg/dialer"
3836
"github.com/containerd/containerd/v2/pkg/sys"
3937
"github.com/containerd/log"
40-
dbmetadata "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/db"
41-
ipfs "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/ipfs"
42-
"github.com/containerd/stargz-snapshotter/fs"
43-
"github.com/containerd/stargz-snapshotter/metadata"
44-
memorymetadata "github.com/containerd/stargz-snapshotter/metadata/memory"
38+
"github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/fsopts"
39+
"github.com/containerd/stargz-snapshotter/cmd/stargz-fuse-manager/fusemanager"
4540
"github.com/containerd/stargz-snapshotter/service"
46-
"github.com/containerd/stargz-snapshotter/service/keychain/cri"
47-
"github.com/containerd/stargz-snapshotter/service/keychain/dockerconfig"
48-
"github.com/containerd/stargz-snapshotter/service/keychain/kubeconfig"
49-
"github.com/containerd/stargz-snapshotter/service/resolver"
41+
"github.com/containerd/stargz-snapshotter/service/keychain/keychainconfig"
42+
snbase "github.com/containerd/stargz-snapshotter/snapshot"
5043
"github.com/containerd/stargz-snapshotter/version"
5144
sddaemon "github.com/coreos/go-systemd/v22/daemon"
5245
metrics "github.com/docker/go-metrics"
5346
"github.com/pelletier/go-toml"
54-
bolt "go.etcd.io/bbolt"
5547
"golang.org/x/sys/unix"
5648
"google.golang.org/grpc"
57-
"google.golang.org/grpc/backoff"
58-
"google.golang.org/grpc/credentials/insecure"
59-
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
6049
)
6150

6251
const (
@@ -65,14 +54,19 @@ const (
6554
defaultLogLevel = log.InfoLevel
6655
defaultRootDir = "/var/lib/containerd-stargz-grpc"
6756
defaultImageServiceAddress = "/run/containerd/containerd.sock"
57+
defaultFuseManagerAddress = "/run/containerd-stargz-grpc/fuse-manager.sock"
58+
59+
fuseManagerBin = "stargz-fuse-manager"
60+
fuseManagerAddress = "fuse-manager.sock"
6861
)
6962

7063
var (
71-
address = flag.String("address", defaultAddress, "address for the snapshotter's GRPC server")
72-
configPath = flag.String("config", defaultConfigPath, "path to the configuration file")
73-
logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]")
74-
rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter")
75-
printVersion = flag.Bool("version", false, "print the version")
64+
address = flag.String("address", defaultAddress, "address for the snapshotter's GRPC server")
65+
configPath = flag.String("config", defaultConfigPath, "path to the configuration file")
66+
logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]")
67+
rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter")
68+
detachFuseManager = flag.Bool("detach-fuse-manager", false, "whether detach fusemanager or not")
69+
printVersion = flag.Bool("version", false, "print the version")
7670
)
7771

7872
type snapshotterConfig struct {
@@ -92,6 +86,11 @@ type snapshotterConfig struct {
9286

9387
// MetadataStore is the type of the metadata store to use.
9488
MetadataStore string `toml:"metadata_store" default:"memory"`
89+
// FuseManagerAddress is address for the fusemanager's GRPC server
90+
FuseManagerAddress string `toml:"fusemanager_address"`
91+
92+
// FuseManagerPath is path to the fusemanager's executable
93+
FuseManagerPath string `toml:"fusemanager_path"`
9594
}
9695

9796
func main() {
@@ -140,51 +139,84 @@ func main() {
140139
}
141140

142141
// Configure keychain
143-
credsFuncs := []resolver.Credential{dockerconfig.NewDockerconfigKeychain(ctx)}
144-
if config.Config.KubeconfigKeychainConfig.EnableKeychain {
145-
var opts []kubeconfig.Option
146-
if kcp := config.Config.KubeconfigKeychainConfig.KubeconfigPath; kcp != "" {
147-
opts = append(opts, kubeconfig.WithKubeconfigPath(kcp))
148-
}
149-
credsFuncs = append(credsFuncs, kubeconfig.NewKubeconfigKeychain(ctx, opts...))
142+
keyChainConfig := keychainconfig.Config{
143+
EnableKubeKeychain: config.Config.KubeconfigKeychainConfig.EnableKeychain,
144+
EnableCRIKeychain: config.Config.CRIKeychainConfig.EnableKeychain,
145+
KubeconfigPath: config.Config.KubeconfigKeychainConfig.KubeconfigPath,
146+
DefaultImageServiceAddress: defaultImageServiceAddress,
147+
ImageServicePath: config.CRIKeychainConfig.ImageServicePath,
150148
}
151-
if config.Config.CRIKeychainConfig.EnableKeychain {
152-
// connects to the backend CRI service (defaults to containerd socket)
153-
criAddr := defaultImageServiceAddress
154-
if cp := config.CRIKeychainConfig.ImageServicePath; cp != "" {
155-
criAddr = cp
149+
150+
var rs snapshots.Snapshotter
151+
if *detachFuseManager {
152+
fmPath := config.FuseManagerPath
153+
if fmPath == "" {
154+
var err error
155+
fmPath, err = exec.LookPath(fuseManagerBin)
156+
if err != nil {
157+
log.G(ctx).WithError(err).Fatalf("failed to find fusemanager bin")
158+
}
156159
}
157-
connectCRI := func() (runtime.ImageServiceClient, error) {
158-
conn, err := newCRIConn(criAddr)
160+
fmAddr := config.FuseManagerAddress
161+
if fmAddr == "" {
162+
var err error
163+
fmAddr, err = exec.LookPath(fuseManagerAddress)
159164
if err != nil {
160-
return nil, err
165+
fmAddr = defaultFuseManagerAddress
161166
}
162-
return runtime.NewImageServiceClient(conn), nil
163167
}
164-
f, criServer := cri.NewCRIKeychain(ctx, connectCRI)
165-
runtime.RegisterImageServiceServer(rpc, criServer)
166-
credsFuncs = append(credsFuncs, f)
167-
}
168-
fsOpts := []fs.Option{fs.WithMetricsLogLevel(log.InfoLevel)}
169-
if config.IPFS {
170-
fsOpts = append(fsOpts, fs.WithResolveHandler("ipfs", new(ipfs.ResolveHandler)))
171-
}
172-
mt, err := getMetadataStore(*rootDir, config)
173-
if err != nil {
174-
log.G(ctx).WithError(err).Fatalf("failed to configure metadata store")
175-
}
176-
fsOpts = append(fsOpts, fs.WithMetadataStore(mt))
177-
rs, err := service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config,
178-
service.WithCredsFuncs(credsFuncs...), service.WithFilesystemOptions(fsOpts...))
179-
if err != nil {
180-
log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter")
168+
err := service.StartFuseManager(ctx, fmPath, fmAddr, filepath.Join(*rootDir, "fusestore.db"), *logLevel, filepath.Join(*rootDir, "stargz-fuse-manager.log"))
169+
if err != nil {
170+
log.G(ctx).WithError(err).Fatalf("failed to start fusemanager")
171+
}
172+
173+
fuseManagerConfig := fusemanager.Config{
174+
Config: &config.Config,
175+
IPFS: config.IPFS,
176+
MetadataStore: config.MetadataStore,
177+
DefaultImageServiceAddress: defaultImageServiceAddress,
178+
}
179+
180+
fs, err := fusemanager.NewManagerClient(ctx, *rootDir, fmAddr, &fuseManagerConfig)
181+
if err != nil {
182+
log.G(ctx).WithError(err).Fatalf("failed to configure fusemanager")
183+
}
184+
rs, err = snbase.NewSnapshotter(ctx, filepath.Join(*rootDir, "snapshotter"), fs, snbase.AsynchronousRemove)
185+
if err != nil {
186+
log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter")
187+
}
188+
log.G(ctx).Infof("Start snapshotter with fusemanager mode")
189+
} else {
190+
credsFuncs, err := keychainconfig.ConfigKeychain(ctx, rpc, &keyChainConfig)
191+
if err != nil {
192+
log.G(ctx).WithError(err).Fatalf("failed to configure keychain")
193+
}
194+
195+
fsConfig := fsopts.Config{
196+
EnableIpfs: config.IPFS,
197+
MetadataStore: config.MetadataStore,
198+
}
199+
fsOpts, err := fsopts.ConfigFsOpts(ctx, *rootDir, &fsConfig)
200+
if err != nil {
201+
log.G(ctx).WithError(err).Fatalf("failed to configure fs config")
202+
}
203+
204+
rs, err = service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config,
205+
service.WithCredsFuncs(credsFuncs...), service.WithFilesystemOptions(fsOpts...))
206+
if err != nil {
207+
log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter")
208+
}
181209
}
182210

183211
cleanup, err := serve(ctx, rpc, *address, rs, config)
184212
if err != nil {
185213
log.G(ctx).WithError(err).Fatalf("failed to serve snapshotter")
186214
}
187215

216+
// TODO: In detach mode, rs is taken over by fusemanager,
217+
// but client will send unmount request to fusemanager,
218+
// and fusemanager need get mount info from local db to
219+
// determine its behavior
188220
if cleanup {
189221
log.G(ctx).Debug("Closing the snapshotter")
190222
rs.Close()
@@ -275,48 +307,3 @@ func serve(ctx context.Context, rpc *grpc.Server, addr string, rs snapshots.Snap
275307
}
276308
return false, nil
277309
}
278-
279-
const (
280-
memoryMetadataType = "memory"
281-
dbMetadataType = "db"
282-
)
283-
284-
func getMetadataStore(rootDir string, config snapshotterConfig) (metadata.Store, error) {
285-
switch config.MetadataStore {
286-
case "", memoryMetadataType:
287-
return memorymetadata.NewReader, nil
288-
case dbMetadataType:
289-
bOpts := bolt.Options{
290-
NoFreelistSync: true,
291-
InitialMmapSize: 64 * 1024 * 1024,
292-
FreelistType: bolt.FreelistMapType,
293-
}
294-
db, err := bolt.Open(filepath.Join(rootDir, "metadata.db"), 0600, &bOpts)
295-
if err != nil {
296-
return nil, err
297-
}
298-
return func(sr *io.SectionReader, opts ...metadata.Option) (metadata.Reader, error) {
299-
return dbmetadata.NewReader(db, sr, opts...)
300-
}, nil
301-
default:
302-
return nil, fmt.Errorf("unknown metadata store type: %v; must be %v or %v",
303-
config.MetadataStore, memoryMetadataType, dbMetadataType)
304-
}
305-
}
306-
307-
func newCRIConn(criAddr string) (*grpc.ClientConn, error) {
308-
// TODO: make gRPC options configurable from config.toml
309-
backoffConfig := backoff.DefaultConfig
310-
backoffConfig.MaxDelay = 3 * time.Second
311-
connParams := grpc.ConnectParams{
312-
Backoff: backoffConfig,
313-
}
314-
gopts := []grpc.DialOption{
315-
grpc.WithTransportCredentials(insecure.NewCredentials()),
316-
grpc.WithConnectParams(connParams),
317-
grpc.WithContextDialer(dialer.ContextDialer),
318-
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
319-
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
320-
}
321-
return grpc.NewClient(dialer.DialAddress(criAddr), gopts...)
322-
}

0 commit comments

Comments
 (0)