Skip to content

Commit ce664b9

Browse files
committed
Fix to pass CRI test
- Add "listen_path" field to the config to specify a custom socket path for the CRI image service. Especially fuse manager needs this to locate the image service socket. - Add JSON tag to configuration fields. This is needed to send configuration to fuse manager in JSON format. Signed-off-by: Kohei Tokunaga <[email protected]>
1 parent 5c622bf commit ce664b9

File tree

11 files changed

+173
-80
lines changed

11 files changed

+173
-80
lines changed

cmd/containerd-stargz-grpc/main.go

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,33 +70,33 @@ type snapshotterConfig struct {
7070
service.Config
7171

7272
// MetricsAddress is address for the metrics API
73-
MetricsAddress string `toml:"metrics_address"`
73+
MetricsAddress string `toml:"metrics_address" json:"metrics_address"`
7474

7575
// NoPrometheus is a flag to disable the emission of the metrics
76-
NoPrometheus bool `toml:"no_prometheus"`
76+
NoPrometheus bool `toml:"no_prometheus" json:"no_prometheus"`
7777

7878
// DebugAddress is a Unix domain socket address where the snapshotter exposes /debug/ endpoints.
79-
DebugAddress string `toml:"debug_address"`
79+
DebugAddress string `toml:"debug_address" json:"debug_address"`
8080

8181
// IPFS is a flag to enbale lazy pulling from IPFS.
82-
IPFS bool `toml:"ipfs"`
82+
IPFS bool `toml:"ipfs" json:"ipfs"`
8383

8484
// MetadataStore is the type of the metadata store to use.
85-
MetadataStore string `toml:"metadata_store" default:"memory"`
85+
MetadataStore string `toml:"metadata_store" default:"memory" json:"metadata_store"`
8686

8787
// FuseManagerConfig is configuration for fusemanager
88-
FuseManagerConfig `toml:"fusemanager"`
88+
FuseManagerConfig `toml:"fusemanager" json:"fusemanager"`
8989
}
9090

9191
type FuseManagerConfig struct {
9292
// EnableFuseManager is whether detach fusemanager or not
93-
EnableFuseManager bool `toml:"enable_fusemanager" default:"false"`
93+
EnableFuseManager bool `toml:"enable_fusemanager" default:"false" json:"enable_fusemanager"`
9494

9595
// FuseManagerAddress is address for the fusemanager's GRPC server (default: "/run/containerd-stargz-grpc/fuse-manager.sock")
96-
FuseManagerAddress string `toml:"fusemanager_address"`
96+
FuseManagerAddress string `toml:"fusemanager_address" json:"fusemanager_address"`
9797

9898
// FuseManagerPath is path to the fusemanager's executable (default: looking for a binary "stargz-fuse-manager")
99-
FuseManagerPath string `toml:"fusemanager_path"`
99+
FuseManagerPath string `toml:"fusemanager_path" json:"fusemanager_path"`
100100
}
101101

102102
func main() {
@@ -177,7 +177,7 @@ func main() {
177177
}
178178

179179
fuseManagerConfig := fusemanager.Config{
180-
Config: &config.Config,
180+
Config: config.Config,
181181
IPFS: config.IPFS,
182182
MetadataStore: config.MetadataStore,
183183
DefaultImageServiceAddress: defaultImageServiceAddress,
@@ -193,10 +193,39 @@ func main() {
193193
}
194194
log.G(ctx).Infof("Start snapshotter with fusemanager mode")
195195
} else {
196-
credsFuncs, err := keychainconfig.ConfigKeychain(ctx, rpc, &keyChainConfig)
196+
crirpc := rpc
197+
// For CRI keychain, if listening path is different from stargz-snapshotter's socket, prepare for the dedicated grpc server and the socket.
198+
serveCRISocket := config.Config.CRIKeychainConfig.EnableKeychain && config.Config.CRIKeychainConfig.ListenPath != "" && config.Config.CRIKeychainConfig.ListenPath != *address
199+
if serveCRISocket {
200+
crirpc = grpc.NewServer()
201+
}
202+
credsFuncs, err := keychainconfig.ConfigKeychain(ctx, crirpc, &keyChainConfig)
197203
if err != nil {
198204
log.G(ctx).WithError(err).Fatalf("failed to configure keychain")
199205
}
206+
if serveCRISocket {
207+
addr := config.Config.CRIKeychainConfig.ListenPath
208+
// Prepare the directory for the socket
209+
if err := os.MkdirAll(filepath.Dir(addr), 0700); err != nil {
210+
log.G(ctx).WithError(err).Fatalf("failed to create directory %q", filepath.Dir(addr))
211+
}
212+
213+
// Try to remove the socket file to avoid EADDRINUSE
214+
if err := os.RemoveAll(addr); err != nil {
215+
log.G(ctx).WithError(err).Fatalf("failed to remove %q", addr)
216+
}
217+
218+
// Listen and serve
219+
l, err := net.Listen("unix", addr)
220+
if err != nil {
221+
log.G(ctx).WithError(err).Fatalf("error on listen socket %q", addr)
222+
}
223+
go func() {
224+
if err := crirpc.Serve(l); err != nil {
225+
log.G(ctx).WithError(err).Errorf("error on serving CRI via socket %q", addr)
226+
}
227+
}()
228+
}
200229

201230
fsConfig := fsopts.Config{
202231
EnableIpfs: config.IPFS,

cmd/stargz-fuse-manager/main.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,18 @@
1717
package main
1818

1919
import (
20+
"fmt"
21+
"net"
22+
"os"
23+
"path/filepath"
24+
25+
"github.com/containerd/log"
26+
2027
"github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/fsopts"
2128
fusemanager "github.com/containerd/stargz-snapshotter/fusemanager"
2229
"github.com/containerd/stargz-snapshotter/service"
2330
"github.com/containerd/stargz-snapshotter/service/keychain/keychainconfig"
31+
"google.golang.org/grpc"
2432
)
2533

2634
func init() {
@@ -44,10 +52,41 @@ func init() {
4452
DefaultImageServiceAddress: cc.Config.DefaultImageServiceAddress,
4553
ImageServicePath: cc.Config.Config.CRIKeychainConfig.ImageServicePath,
4654
}
47-
credsFuncs, err := keychainconfig.ConfigKeychain(cc.Ctx, cc.Server, &keyChainConfig)
55+
if cc.Config.Config.CRIKeychainConfig.EnableKeychain && cc.Config.Config.CRIKeychainConfig.ListenPath == "" || cc.Config.Config.CRIKeychainConfig.ListenPath == cc.Address {
56+
return nil, fmt.Errorf("listen path must be specified as a separated socket")
57+
}
58+
// For CRI keychain, if listening path is different from stargz-snapshotter's socket, prepare for the dedicated grpc server and the socket.
59+
serveCRISocket := cc.Config.Config.CRIKeychainConfig.EnableKeychain && cc.Config.Config.CRIKeychainConfig.ListenPath != "" && cc.Config.Config.CRIKeychainConfig.ListenPath != cc.Address
60+
if serveCRISocket {
61+
cc.CRIServer = grpc.NewServer()
62+
}
63+
credsFuncs, err := keychainconfig.ConfigKeychain(cc.Ctx, cc.CRIServer, &keyChainConfig)
4864
if err != nil {
4965
return nil, err
5066
}
67+
if serveCRISocket {
68+
addr := cc.Config.Config.CRIKeychainConfig.ListenPath
69+
// Prepare the directory for the socket
70+
if err := os.MkdirAll(filepath.Dir(addr), 0700); err != nil {
71+
return nil, fmt.Errorf("failed to create directory %q: %w", filepath.Dir(addr), err)
72+
}
73+
74+
// Try to remove the socket file to avoid EADDRINUSE
75+
if err := os.RemoveAll(addr); err != nil {
76+
return nil, fmt.Errorf("failed to remove %q: %w", addr, err)
77+
}
78+
79+
// Listen and serve
80+
l, err := net.Listen("unix", addr)
81+
if err != nil {
82+
return nil, fmt.Errorf("error on listen socket %q: %w", addr, err)
83+
}
84+
go func() {
85+
if err := cc.CRIServer.Serve(l); err != nil {
86+
log.G(cc.Ctx).WithError(err).Errorf("error on serving CRI via socket %q", addr)
87+
}
88+
}()
89+
}
5190
return []service.Option{service.WithCredsFuncs(credsFuncs...)}, nil
5291
})
5392
}

fs/config/config.go

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -37,118 +37,118 @@ const (
3737
type Config struct {
3838
// Type of cache for compressed contents fetched from the registry. "memory" stores them on memory.
3939
// Other values default to cache them on disk.
40-
HTTPCacheType string `toml:"http_cache_type"`
40+
HTTPCacheType string `toml:"http_cache_type" json:"http_cache_type"`
4141

4242
// Type of cache for uncompressed files contents. "memory" stores them on memory. Other values
4343
// default to cache them on disk.
44-
FSCacheType string `toml:"filesystem_cache_type"`
44+
FSCacheType string `toml:"filesystem_cache_type" json:"filesystem_cache_type"`
4545

4646
// ResolveResultEntryTTLSec is TTL (in sec) to cache resolved layers for
4747
// future use. (default 120s)
48-
ResolveResultEntryTTLSec int `toml:"resolve_result_entry_ttl_sec"`
48+
ResolveResultEntryTTLSec int `toml:"resolve_result_entry_ttl_sec" json:"resolve_result_entry_ttl_sec"`
4949

5050
// PrefetchSize is the default size (in bytes) to prefetch when mounting a layer. Default is 0. Stargz-snapshotter still
5151
// uses the value specified by the image using "containerd.io/snapshot/remote/stargz.prefetch" or the landmark file.
52-
PrefetchSize int64 `toml:"prefetch_size"`
52+
PrefetchSize int64 `toml:"prefetch_size" json:"prefetch_size"`
5353

5454
// PrefetchTimeoutSec is the default timeout (in seconds) when the prefetching takes long. Default is 10s.
55-
PrefetchTimeoutSec int64 `toml:"prefetch_timeout_sec"`
55+
PrefetchTimeoutSec int64 `toml:"prefetch_timeout_sec" json:"prefetch_timeout_sec"`
5656

5757
// NoPrefetch disables prefetching. Default is false.
58-
NoPrefetch bool `toml:"noprefetch"`
58+
NoPrefetch bool `toml:"noprefetch" json:"noprefetch"`
5959

6060
// NoBackgroundFetch disables the behaviour of fetching the entire layer contents in background. Default is false.
61-
NoBackgroundFetch bool `toml:"no_background_fetch"`
61+
NoBackgroundFetch bool `toml:"no_background_fetch" json:"no_background_fetch"`
6262

6363
// Debug enables filesystem debug log.
64-
Debug bool `toml:"debug"`
64+
Debug bool `toml:"debug" json:"debug"`
6565

6666
// AllowNoVerification allows mouting images without verification. Default is false.
67-
AllowNoVerification bool `toml:"allow_no_verification"`
67+
AllowNoVerification bool `toml:"allow_no_verification" json:"allow_no_verification"`
6868

6969
// DisableVerification disables verifying layer contents. Default is false.
70-
DisableVerification bool `toml:"disable_verification"`
70+
DisableVerification bool `toml:"disable_verification" json:"disable_verification"`
7171

7272
// MaxConcurrency is max number of concurrent background tasks for fetching layer contents. Default is 2.
73-
MaxConcurrency int64 `toml:"max_concurrency"`
73+
MaxConcurrency int64 `toml:"max_concurrency" json:"max_concurrency"`
7474

7575
// NoPrometheus disables exposing filesystem-related metrics. Default is false.
76-
NoPrometheus bool `toml:"no_prometheus"`
76+
NoPrometheus bool `toml:"no_prometheus" json:"no_prometheus"`
7777

7878
// BlobConfig is config for layer blob management.
79-
BlobConfig `toml:"blob"`
79+
BlobConfig `toml:"blob" json:"blob"`
8080

8181
// DirectoryCacheConfig is config for directory-based cache.
82-
DirectoryCacheConfig `toml:"directory_cache"`
82+
DirectoryCacheConfig `toml:"directory_cache" json:"directory_cache"`
8383

8484
// FuseConfig is configurations for FUSE fs.
85-
FuseConfig `toml:"fuse"`
85+
FuseConfig `toml:"fuse" json:"fuse"`
8686

8787
// ResolveResultEntry is a deprecated field.
88-
ResolveResultEntry int `toml:"resolve_result_entry"` // deprecated
88+
ResolveResultEntry int `toml:"resolve_result_entry" json:"resolve_result_entry"` // deprecated
8989
}
9090

9191
// BlobConfig is configuration for the logic to fetching blobs.
9292
type BlobConfig struct {
9393
// ValidInterval specifies a duration (in seconds) during which the layer can be reused without
9494
// checking the connection to the registry. Default is 60.
95-
ValidInterval int64 `toml:"valid_interval"`
95+
ValidInterval int64 `toml:"valid_interval" json:"valid_interval"`
9696

9797
// CheckAlways overwrites ValidInterval to 0 if it's true. Default is false.
98-
CheckAlways bool `toml:"check_always"`
98+
CheckAlways bool `toml:"check_always" json:"check_always"`
9999

100100
// ChunkSize is the granularity (in bytes) at which background fetch and on-demand reads
101101
// are fetched from the remote registry. Default is 50000.
102-
ChunkSize int64 `toml:"chunk_size"`
102+
ChunkSize int64 `toml:"chunk_size" json:"chunk_size"`
103103

104104
// FetchTimeoutSec is a timeout duration (in seconds) for fetching chunks from the registry. Default is 300.
105-
FetchTimeoutSec int64 `toml:"fetching_timeout_sec"`
105+
FetchTimeoutSec int64 `toml:"fetching_timeout_sec" json:"fetching_tieout_sec"`
106106

107107
// ForceSingleRangeMode disables using of multiple ranges in a Range Request and always specifies one larger
108108
// region that covers them. Default is false.
109-
ForceSingleRangeMode bool `toml:"force_single_range_mode"`
109+
ForceSingleRangeMode bool `toml:"force_single_range_mode" json:"force_single_range_mode"`
110110

111111
// PrefetchChunkSize is the maximum bytes transferred per http GET from remote registry
112112
// during prefetch. It is recommended to have PrefetchChunkSize > ChunkSize.
113113
// If PrefetchChunkSize < ChunkSize prefetch bytes will be fetched as a single http GET,
114114
// else total GET requests for prefetch = ceil(PrefetchSize / PrefetchChunkSize).
115115
// Default is 0.
116-
PrefetchChunkSize int64 `toml:"prefetch_chunk_size"`
116+
PrefetchChunkSize int64 `toml:"prefetch_chunk_size" json:"prefetch_chunk_size"`
117117

118118
// MaxRetries is a max number of reries of a HTTP request. Default is 5.
119-
MaxRetries int `toml:"max_retries"`
119+
MaxRetries int `toml:"max_retries" json:"max_retries"`
120120

121121
// MinWaitMSec is minimal delay (in seconds) for the next retrying after a request failure. Default is 30.
122-
MinWaitMSec int `toml:"min_wait_msec"`
122+
MinWaitMSec int `toml:"min_wait_msec" json:"min_wait_msec"`
123123

124124
// MinWaitMSec is maximum delay (in seconds) for the next retrying after a request failure. Default is 30.
125-
MaxWaitMSec int `toml:"max_wait_msec"`
125+
MaxWaitMSec int `toml:"max_wait_msec" json:"max_wait_msec"`
126126
}
127127

128128
// DirectoryCacheConfig is configuration for the disk-based cache.
129129
type DirectoryCacheConfig struct {
130130
// MaxLRUCacheEntry is the number of entries of LRU cache to cache data on memory. Default is 10.
131-
MaxLRUCacheEntry int `toml:"max_lru_cache_entry"`
131+
MaxLRUCacheEntry int `toml:"max_lru_cache_entry" json:"max_lru_cache_entry"`
132132

133133
// MaxCacheFds is the number of entries of LRU cache to hold fds of files of cached contents. Default is 10.
134-
MaxCacheFds int `toml:"max_cache_fds"`
134+
MaxCacheFds int `toml:"max_cache_fds" json:"max_cache_fds"`
135135

136136
// SyncAdd being true means that each adding of data to the cache blocks until the data is fully written to the
137137
// cache directory. Default is false.
138-
SyncAdd bool `toml:"sync_add"`
138+
SyncAdd bool `toml:"sync_add" json:"sync_add"`
139139

140140
// Direct disables on-memory data cache. Default is true for saving memory usage.
141-
Direct bool `toml:"direct" default:"true"`
141+
Direct bool `toml:"direct" default:"true" json:"direct"`
142142
}
143143

144144
// FuseConfig is configuration for FUSE fs.
145145
type FuseConfig struct {
146146
// AttrTimeout defines overall timeout attribute for a file system in seconds.
147-
AttrTimeout int64 `toml:"attr_timeout"`
147+
AttrTimeout int64 `toml:"attr_timeout" json:"attr_timeout"`
148148

149149
// EntryTimeout defines TTL for directory, name lookup in seconds.
150-
EntryTimeout int64 `toml:"entry_timeout"`
150+
EntryTimeout int64 `toml:"entry_timeout" json:"entry_timeout"`
151151

152152
// PassThrough indicates whether to enable FUSE passthrough mode to improve local file read performance. Default is false.
153-
PassThrough bool `toml:"passthrough" default:"false"`
153+
PassThrough bool `toml:"passthrough" default:"false" json:"passthrough"`
154154
}

fusemanager/fusemanager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func runFuseManager(ctx context.Context) error {
194194
}
195195

196196
server := grpc.NewServer()
197-
fm, err := NewFuseManager(ctx, l, server, fuseStoreAddr)
197+
fm, err := NewFuseManager(ctx, l, server, fuseStoreAddr, address)
198198
if err != nil {
199199
return fmt.Errorf("failed to configure manager server: %w", err)
200200
}

fusemanager/fusemanager_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ type mockServer struct {
8585
initErr error
8686
}
8787

88-
func newMockServer(ctx context.Context, listener net.Listener, server *grpc.Server, fuseStoreAddr string) (*mockServer, error) {
89-
s, err := NewFuseManager(ctx, listener, server, fuseStoreAddr)
88+
func newMockServer(ctx context.Context, listener net.Listener, server *grpc.Server, fuseStoreAddr, serverAddr string) (*mockServer, error) {
89+
s, err := NewFuseManager(ctx, listener, server, fuseStoreAddr, serverAddr)
9090
if err != nil {
9191
return nil, err
9292
}
@@ -121,6 +121,7 @@ func TestFuseManager(t *testing.T) {
121121

122122
socketPath := filepath.Join(tmpDir, "test.sock")
123123
fuseStorePath := filepath.Join(tmpDir, "fusestore.db")
124+
fuseManagerSocketPath := filepath.Join(tmpDir, "test-fusemanager.sock")
124125

125126
l, err := net.Listen("unix", socketPath)
126127
if err != nil {
@@ -131,7 +132,7 @@ func TestFuseManager(t *testing.T) {
131132
// Create server with mock
132133
grpcServer := grpc.NewServer()
133134
mockFs := newMockFileSystem(t)
134-
fm, err := newMockServer(context.Background(), l, grpcServer, fuseStorePath)
135+
fm, err := newMockServer(context.Background(), l, grpcServer, fuseStorePath, fuseManagerSocketPath)
135136
if err != nil {
136137
t.Fatalf("failed to create fuse manager: %v", err)
137138
}
@@ -187,7 +188,7 @@ func TestFuseManager(t *testing.T) {
187188
fm.initCalled = false
188189

189190
config := &Config{
190-
Config: &service.Config{},
191+
Config: service.Config{},
191192
}
192193
client, err := NewManagerClient(context.Background(), tmpDir, socketPath, config)
193194
if err != nil {

0 commit comments

Comments
 (0)