Skip to content

Commit efbcc75

Browse files
committed
feat: implement hardlinking for cache files to reduce storage usage
This commit adds a hardlink system for the Stargz Snapshotter cache to optimize storage and improve performance. The system intelligently creates hardlinks between identical content chunks, significantly reducing disk space usage in environments with many containers using the same base layers. Key changes: - Add new HardlinkManager that tracks files by chunk digest - Enable hardlinking between chunk files with same content - Add configuration option `EnableHardlink` to control the feature - Preserve file digest mapping across snapshotter restarts - Add documentation on hardlink usage and configuration The implementation includes: - Chunk-level digest tracking for optimizing cache lookups - Background persistence of hardlink mappings to survive restarts - Automatic cleanup of unused digest mappings - Test suite for hardlink functionality Signed-off-by: ChengyuZhu6 <[email protected]>
1 parent 0790ac8 commit efbcc75

File tree

16 files changed

+1395
-133
lines changed

16 files changed

+1395
-133
lines changed

cache/cache.go

Lines changed: 162 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ package cache
1818

1919
import (
2020
"bytes"
21-
"errors"
2221
"fmt"
2322
"io"
2423
"os"
2524
"path/filepath"
2625
"sync"
2726

27+
"github.com/containerd/log"
2828
"github.com/containerd/stargz-snapshotter/util/cacheutil"
2929
"github.com/containerd/stargz-snapshotter/util/namedmutex"
3030
)
@@ -61,6 +61,9 @@ type DirectoryCacheConfig struct {
6161
// Direct forcefully enables direct mode for all operation in cache.
6262
// Thus operation won't use on-memory caches.
6363
Direct bool
64+
65+
// EnableHardlink enables hardlinking of cache files to reduce memory usage
66+
EnableHardlink bool
6467
}
6568

6669
// TODO: contents validation.
@@ -99,6 +102,7 @@ type Writer interface {
99102
type cacheOpt struct {
100103
direct bool
101104
passThrough bool
105+
chunkDigest string
102106
}
103107

104108
type Option func(o *cacheOpt) *cacheOpt
@@ -123,6 +127,14 @@ func PassThrough() Option {
123127
}
124128
}
125129

130+
// ChunkDigest option allows specifying a chunk digest for the cache
131+
func ChunkDigest(digest string) Option {
132+
return func(o *cacheOpt) *cacheOpt {
133+
o.chunkDigest = digest
134+
return o
135+
}
136+
}
137+
126138
func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache, error) {
127139
if !filepath.IsAbs(directory) {
128140
return nil, fmt.Errorf("dir cache path must be an absolute path; got %q", directory)
@@ -166,15 +178,24 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
166178
return nil, err
167179
}
168180
dc := &directoryCache{
169-
cache: dataCache,
170-
fileCache: fdCache,
171-
wipLock: new(namedmutex.NamedMutex),
172-
directory: directory,
173-
wipDirectory: wipdir,
174-
bufPool: bufPool,
175-
direct: config.Direct,
181+
cache: dataCache,
182+
fileCache: fdCache,
183+
wipLock: new(namedmutex.NamedMutex),
184+
directory: directory,
185+
wipDirectory: wipdir,
186+
bufPool: bufPool,
187+
direct: config.Direct,
188+
enableHardlink: config.EnableHardlink,
189+
syncAdd: config.SyncAdd,
190+
}
191+
192+
// Initialize hardlink manager if enabled
193+
if config.EnableHardlink {
194+
hlManager, enabled := InitializeHardlinkManager(filepath.Dir(filepath.Dir(directory)), config.EnableHardlink)
195+
dc.hlManager = hlManager
196+
dc.enableHardlink = enabled
176197
}
177-
dc.syncAdd = config.SyncAdd
198+
178199
return dc, nil
179200
}
180201

@@ -193,6 +214,9 @@ type directoryCache struct {
193214

194215
closed bool
195216
closedMu sync.Mutex
217+
218+
enableHardlink bool
219+
hlManager *HardlinkManager
196220
}
197221

198222
func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
@@ -205,9 +229,15 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
205229
opt = o(opt)
206230
}
207231

232+
// Try to get from memory cache
208233
if !dc.direct && !opt.direct {
209-
// Get data from memory
210-
if b, done, ok := dc.cache.Get(key); ok {
234+
// Try memory cache for digest or key
235+
cacheKey := key
236+
if dc.hlManager != nil && dc.hlManager.IsEnabled() && opt.chunkDigest != "" {
237+
cacheKey = opt.chunkDigest
238+
}
239+
240+
if b, done, ok := dc.cache.Get(cacheKey); ok {
211241
return &reader{
212242
ReaderAt: bytes.NewReader(b.(*bytes.Buffer).Bytes()),
213243
closeFunc: func() error {
@@ -217,8 +247,8 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
217247
}, nil
218248
}
219249

220-
// Get data from disk. If the file is already opened, use it.
221-
if f, done, ok := dc.fileCache.Get(key); ok {
250+
// Get data from file cache for digest or key
251+
if f, done, ok := dc.fileCache.Get(cacheKey); ok {
222252
return &reader{
223253
ReaderAt: f.(*os.File),
224254
closeFunc: func() error {
@@ -229,24 +259,28 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
229259
}
230260
}
231261

262+
// First try regular file path
263+
filepath := BuildCachePath(dc.directory, key)
264+
265+
// Check hardlink manager for existing digest file
266+
if dc.hlManager != nil && opt.chunkDigest != "" {
267+
if digestPath, exists := dc.hlManager.ProcessCacheGet(key, opt.chunkDigest, opt.direct); exists {
268+
log.L.Debugf("Using existing file for digest %q instead of key %q", opt.chunkDigest, key)
269+
filepath = digestPath
270+
}
271+
}
272+
232273
// Open the cache file and read the target region
233-
// TODO: If the target cache is write-in-progress, should we wait for the completion
234-
// or simply report the cache miss?
235-
file, err := os.Open(dc.cachePath(key))
274+
file, err := os.Open(filepath)
236275
if err != nil {
237276
return nil, fmt.Errorf("failed to open blob file for %q: %w", key, err)
238277
}
239278

240-
// If "direct" option is specified, do not cache the file on memory.
241-
// This option is useful for preventing memory cache from being polluted by data
242-
// that won't be accessed immediately.
279+
// If in direct mode, don't cache file descriptor
243280
if dc.direct || opt.direct {
244281
return &reader{
245282
ReaderAt: file,
246283
closeFunc: func() error {
247-
// In passthough model, close will be toke over by go-fuse
248-
// If "passThrough" option is specified, "direct" option also will
249-
// be specified, so adding this branch here is enough
250284
if opt.passThrough {
251285
return nil
252286
}
@@ -255,16 +289,19 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
255289
}, nil
256290
}
257291

258-
// TODO: should we cache the entire file data on memory?
259-
// but making I/O (possibly huge) on every fetching
260-
// might be costly.
292+
// Cache file descriptor
261293
return &reader{
262294
ReaderAt: file,
263295
closeFunc: func() error {
264-
_, done, added := dc.fileCache.Add(key, file)
265-
defer done() // Release it immediately. Cleaned up on eviction.
296+
cacheKey := key
297+
if dc.hlManager != nil && dc.hlManager.IsEnabled() && opt.chunkDigest != "" {
298+
cacheKey = opt.chunkDigest
299+
}
300+
301+
_, done, added := dc.fileCache.Add(cacheKey, file)
302+
defer done()
266303
if !added {
267-
return file.Close() // file already exists in the cache. close it.
304+
return file.Close()
268305
}
269306
return nil
270307
},
@@ -281,81 +318,74 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
281318
opt = o(opt)
282319
}
283320

284-
wip, err := dc.wipFile(key)
321+
// If hardlink manager exists and digest is provided, check if a hardlink can be created
322+
if dc.hlManager != nil && opt.chunkDigest != "" {
323+
keyPath := BuildCachePath(dc.directory, key)
324+
325+
// Try to create a hardlink from existing digest file
326+
if dc.hlManager.ProcessCacheAdd(key, opt.chunkDigest, keyPath) {
327+
// Return a no-op writer since the file already exists
328+
return &writer{
329+
WriteCloser: nopWriteCloser(io.Discard),
330+
commitFunc: func() error { return nil },
331+
abortFunc: func() error { return nil },
332+
}, nil
333+
}
334+
}
335+
336+
// Create temporary file
337+
w, err := WipFile(dc.wipDirectory, key)
285338
if err != nil {
286339
return nil, err
287340
}
288-
w := &writer{
289-
WriteCloser: wip,
341+
342+
// Create writer
343+
writer := &writer{
344+
WriteCloser: w,
290345
commitFunc: func() error {
291346
if dc.isClosed() {
292347
return fmt.Errorf("cache is already closed")
293348
}
294-
// Commit the cache contents
295-
c := dc.cachePath(key)
296-
if err := os.MkdirAll(filepath.Dir(c), os.ModePerm); err != nil {
297-
var errs []error
298-
if err := os.Remove(wip.Name()); err != nil {
299-
errs = append(errs, err)
300-
}
301-
errs = append(errs, fmt.Errorf("failed to create cache directory %q: %w", c, err))
302-
return errors.Join(errs...)
303-
}
304-
return os.Rename(wip.Name(), c)
305-
},
306-
abortFunc: func() error {
307-
return os.Remove(wip.Name())
308-
},
309-
}
310-
311-
// If "direct" option is specified, do not cache the passed data on memory.
312-
// This option is useful for preventing memory cache from being polluted by data
313-
// that won't be accessed immediately.
314-
if dc.direct || opt.direct {
315-
return w, nil
316-
}
317349

318-
b := dc.bufPool.Get().(*bytes.Buffer)
319-
memW := &writer{
320-
WriteCloser: nopWriteCloser(io.Writer(b)),
321-
commitFunc: func() error {
322-
if dc.isClosed() {
323-
w.Close()
324-
return fmt.Errorf("cache is already closed")
350+
// Commit file
351+
targetPath := BuildCachePath(dc.directory, key)
352+
if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil {
353+
return fmt.Errorf("failed to create cache directory: %w", err)
325354
}
326-
cached, done, added := dc.cache.Add(key, b)
327-
if !added {
328-
dc.putBuffer(b) // already exists in the cache. abort it.
355+
356+
if err := os.Rename(w.Name(), targetPath); err != nil {
357+
return fmt.Errorf("failed to commit cache file: %w", err)
329358
}
330-
commit := func() error {
331-
defer done()
332-
defer w.Close()
333-
n, err := w.Write(cached.(*bytes.Buffer).Bytes())
334-
if err != nil || n != cached.(*bytes.Buffer).Len() {
335-
w.Abort()
336-
return err
359+
360+
// If hardlink manager exists and digest is provided, register the file
361+
if dc.hlManager != nil && dc.hlManager.IsEnabled() && opt.chunkDigest != "" {
362+
// Register this file as the primary source for this digest
363+
if err := dc.hlManager.RegisterDigestFile(opt.chunkDigest, targetPath); err != nil {
364+
log.L.Debugf("Failed to register digest file: %v", err)
337365
}
338-
return w.Commit()
339-
}
340-
if dc.syncAdd {
341-
return commit()
342-
}
343-
go func() {
344-
if err := commit(); err != nil {
345-
fmt.Println("failed to commit to file:", err)
366+
367+
// Map key to digest
368+
internalKey := dc.hlManager.GenerateInternalKey(dc.directory, key)
369+
if err := dc.hlManager.MapKeyToDigest(internalKey, opt.chunkDigest); err != nil {
370+
log.L.Debugf("Failed to map key to digest: %v", err)
346371
}
347-
}()
372+
}
373+
348374
return nil
349375
},
350376
abortFunc: func() error {
351-
defer w.Close()
352-
defer w.Abort()
353-
dc.putBuffer(b) // abort it.
354-
return nil
377+
return os.Remove(w.Name())
355378
},
356379
}
357380

358-
return memW, nil
381+
// Return directly if in direct mode
382+
if dc.direct || opt.direct {
383+
return writer, nil
384+
}
385+
386+
// Create memory cache
387+
b := dc.bufPool.Get().(*bytes.Buffer)
388+
return dc.wrapMemoryWriter(b, writer, key)
359389
}
360390

361391
func (dc *directoryCache) putBuffer(b *bytes.Buffer) {
@@ -380,14 +410,6 @@ func (dc *directoryCache) isClosed() bool {
380410
return closed
381411
}
382412

383-
func (dc *directoryCache) cachePath(key string) string {
384-
return filepath.Join(dc.directory, key[:2], key)
385-
}
386-
387-
func (dc *directoryCache) wipFile(key string) (*os.File, error) {
388-
return os.CreateTemp(dc.wipDirectory, key+"-*")
389-
}
390-
391413
func NewMemoryCache() BlobCache {
392414
return &MemoryCache{
393415
Membuf: map[string]*bytes.Buffer{},
@@ -463,3 +485,50 @@ func (w *writeCloser) Close() error { return w.closeFunc() }
463485
func nopWriteCloser(w io.Writer) io.WriteCloser {
464486
return &writeCloser{w, func() error { return nil }}
465487
}
488+
489+
// wrapMemoryWriter wraps a writer with memory caching
490+
func (dc *directoryCache) wrapMemoryWriter(b *bytes.Buffer, w *writer, key string) (Writer, error) {
491+
return &writer{
492+
WriteCloser: nopWriteCloser(b),
493+
commitFunc: func() error {
494+
if dc.isClosed() {
495+
w.Close()
496+
return fmt.Errorf("cache is already closed")
497+
}
498+
499+
cached, done, added := dc.cache.Add(key, b)
500+
if !added {
501+
dc.putBuffer(b)
502+
}
503+
504+
commit := func() error {
505+
defer done()
506+
defer w.Close()
507+
508+
n, err := w.Write(cached.(*bytes.Buffer).Bytes())
509+
if err != nil || n != cached.(*bytes.Buffer).Len() {
510+
w.Abort()
511+
return err
512+
}
513+
return w.Commit()
514+
}
515+
516+
if dc.syncAdd {
517+
return commit()
518+
}
519+
520+
go func() {
521+
if err := commit(); err != nil {
522+
log.L.Infof("failed to commit to file: %v", err)
523+
}
524+
}()
525+
return nil
526+
},
527+
abortFunc: func() error {
528+
defer w.Close()
529+
defer w.Abort()
530+
dc.putBuffer(b)
531+
return nil
532+
},
533+
}, nil
534+
}

0 commit comments

Comments
 (0)