@@ -18,13 +18,13 @@ package cache
18
18
19
19
import (
20
20
"bytes"
21
- "errors"
22
21
"fmt"
23
22
"io"
24
23
"os"
25
24
"path/filepath"
26
25
"sync"
27
26
27
+ "github.com/containerd/log"
28
28
"github.com/containerd/stargz-snapshotter/util/cacheutil"
29
29
"github.com/containerd/stargz-snapshotter/util/namedmutex"
30
30
)
@@ -61,6 +61,9 @@ type DirectoryCacheConfig struct {
61
61
// Direct forcefully enables direct mode for all operation in cache.
62
62
// Thus operation won't use on-memory caches.
63
63
Direct bool
64
+
65
+ // EnableHardlink enables hardlinking of cache files to reduce memory usage
66
+ EnableHardlink bool
64
67
}
65
68
66
69
// TODO: contents validation.
@@ -99,6 +102,7 @@ type Writer interface {
99
102
type cacheOpt struct {
100
103
direct bool
101
104
passThrough bool
105
+ chunkDigest string
102
106
}
103
107
104
108
type Option func (o * cacheOpt ) * cacheOpt
@@ -123,6 +127,14 @@ func PassThrough() Option {
123
127
}
124
128
}
125
129
130
+ // ChunkDigest option allows specifying a chunk digest for the cache
131
+ func ChunkDigest (digest string ) Option {
132
+ return func (o * cacheOpt ) * cacheOpt {
133
+ o .chunkDigest = digest
134
+ return o
135
+ }
136
+ }
137
+
126
138
func NewDirectoryCache (directory string , config DirectoryCacheConfig ) (BlobCache , error ) {
127
139
if ! filepath .IsAbs (directory ) {
128
140
return nil , fmt .Errorf ("dir cache path must be an absolute path; got %q" , directory )
@@ -166,15 +178,24 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
166
178
return nil , err
167
179
}
168
180
dc := & directoryCache {
169
- cache : dataCache ,
170
- fileCache : fdCache ,
171
- wipLock : new (namedmutex.NamedMutex ),
172
- directory : directory ,
173
- wipDirectory : wipdir ,
174
- bufPool : bufPool ,
175
- direct : config .Direct ,
181
+ cache : dataCache ,
182
+ fileCache : fdCache ,
183
+ wipLock : new (namedmutex.NamedMutex ),
184
+ directory : directory ,
185
+ wipDirectory : wipdir ,
186
+ bufPool : bufPool ,
187
+ direct : config .Direct ,
188
+ enableHardlink : config .EnableHardlink ,
189
+ syncAdd : config .SyncAdd ,
190
+ }
191
+
192
+ // Initialize hardlink manager if enabled
193
+ if config .EnableHardlink {
194
+ hlManager , enabled := InitializeHardlinkManager (filepath .Dir (filepath .Dir (directory )), config .EnableHardlink )
195
+ dc .hlManager = hlManager
196
+ dc .enableHardlink = enabled
176
197
}
177
- dc . syncAdd = config . SyncAdd
198
+
178
199
return dc , nil
179
200
}
180
201
@@ -193,6 +214,9 @@ type directoryCache struct {
193
214
194
215
closed bool
195
216
closedMu sync.Mutex
217
+
218
+ enableHardlink bool
219
+ hlManager * HardlinkManager
196
220
}
197
221
198
222
func (dc * directoryCache ) Get (key string , opts ... Option ) (Reader , error ) {
@@ -205,9 +229,15 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
205
229
opt = o (opt )
206
230
}
207
231
232
+ // Try to get from memory cache
208
233
if ! dc .direct && ! opt .direct {
209
- // Get data from memory
210
- if b , done , ok := dc .cache .Get (key ); ok {
234
+ // Try memory cache for digest or key
235
+ cacheKey := key
236
+ if dc .hlManager != nil && dc .hlManager .IsEnabled () && opt .chunkDigest != "" {
237
+ cacheKey = opt .chunkDigest
238
+ }
239
+
240
+ if b , done , ok := dc .cache .Get (cacheKey ); ok {
211
241
return & reader {
212
242
ReaderAt : bytes .NewReader (b .(* bytes.Buffer ).Bytes ()),
213
243
closeFunc : func () error {
@@ -217,8 +247,8 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
217
247
}, nil
218
248
}
219
249
220
- // Get data from disk. If the file is already opened, use it.
221
- if f , done , ok := dc .fileCache .Get (key ); ok {
250
+ // Get data from file cache for digest or key
251
+ if f , done , ok := dc .fileCache .Get (cacheKey ); ok {
222
252
return & reader {
223
253
ReaderAt : f .(* os.File ),
224
254
closeFunc : func () error {
@@ -229,24 +259,28 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
229
259
}
230
260
}
231
261
262
+ // First try regular file path
263
+ filepath := BuildCachePath (dc .directory , key )
264
+
265
+ // Check hardlink manager for existing digest file
266
+ if dc .hlManager != nil && opt .chunkDigest != "" {
267
+ if digestPath , exists := dc .hlManager .ProcessCacheGet (key , opt .chunkDigest , opt .direct ); exists {
268
+ log .L .Debugf ("Using existing file for digest %q instead of key %q" , opt .chunkDigest , key )
269
+ filepath = digestPath
270
+ }
271
+ }
272
+
232
273
// Open the cache file and read the target region
233
- // TODO: If the target cache is write-in-progress, should we wait for the completion
234
- // or simply report the cache miss?
235
- file , err := os .Open (dc .cachePath (key ))
274
+ file , err := os .Open (filepath )
236
275
if err != nil {
237
276
return nil , fmt .Errorf ("failed to open blob file for %q: %w" , key , err )
238
277
}
239
278
240
- // If "direct" option is specified, do not cache the file on memory.
241
- // This option is useful for preventing memory cache from being polluted by data
242
- // that won't be accessed immediately.
279
+ // If in direct mode, don't cache file descriptor
243
280
if dc .direct || opt .direct {
244
281
return & reader {
245
282
ReaderAt : file ,
246
283
closeFunc : func () error {
247
- // In passthough model, close will be toke over by go-fuse
248
- // If "passThrough" option is specified, "direct" option also will
249
- // be specified, so adding this branch here is enough
250
284
if opt .passThrough {
251
285
return nil
252
286
}
@@ -255,16 +289,19 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
255
289
}, nil
256
290
}
257
291
258
- // TODO: should we cache the entire file data on memory?
259
- // but making I/O (possibly huge) on every fetching
260
- // might be costly.
292
+ // Cache file descriptor
261
293
return & reader {
262
294
ReaderAt : file ,
263
295
closeFunc : func () error {
264
- _ , done , added := dc .fileCache .Add (key , file )
265
- defer done () // Release it immediately. Cleaned up on eviction.
296
+ cacheKey := key
297
+ if dc .hlManager != nil && dc .hlManager .IsEnabled () && opt .chunkDigest != "" {
298
+ cacheKey = opt .chunkDigest
299
+ }
300
+
301
+ _ , done , added := dc .fileCache .Add (cacheKey , file )
302
+ defer done ()
266
303
if ! added {
267
- return file .Close () // file already exists in the cache. close it.
304
+ return file .Close ()
268
305
}
269
306
return nil
270
307
},
@@ -281,81 +318,74 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
281
318
opt = o (opt )
282
319
}
283
320
284
- wip , err := dc .wipFile (key )
321
+ // If hardlink manager exists and digest is provided, check if a hardlink can be created
322
+ if dc .hlManager != nil && opt .chunkDigest != "" {
323
+ keyPath := BuildCachePath (dc .directory , key )
324
+
325
+ // Try to create a hardlink from existing digest file
326
+ if dc .hlManager .ProcessCacheAdd (key , opt .chunkDigest , keyPath ) {
327
+ // Return a no-op writer since the file already exists
328
+ return & writer {
329
+ WriteCloser : nopWriteCloser (io .Discard ),
330
+ commitFunc : func () error { return nil },
331
+ abortFunc : func () error { return nil },
332
+ }, nil
333
+ }
334
+ }
335
+
336
+ // Create temporary file
337
+ w , err := WipFile (dc .wipDirectory , key )
285
338
if err != nil {
286
339
return nil , err
287
340
}
288
- w := & writer {
289
- WriteCloser : wip ,
341
+
342
+ // Create writer
343
+ writer := & writer {
344
+ WriteCloser : w ,
290
345
commitFunc : func () error {
291
346
if dc .isClosed () {
292
347
return fmt .Errorf ("cache is already closed" )
293
348
}
294
- // Commit the cache contents
295
- c := dc .cachePath (key )
296
- if err := os .MkdirAll (filepath .Dir (c ), os .ModePerm ); err != nil {
297
- var errs []error
298
- if err := os .Remove (wip .Name ()); err != nil {
299
- errs = append (errs , err )
300
- }
301
- errs = append (errs , fmt .Errorf ("failed to create cache directory %q: %w" , c , err ))
302
- return errors .Join (errs ... )
303
- }
304
- return os .Rename (wip .Name (), c )
305
- },
306
- abortFunc : func () error {
307
- return os .Remove (wip .Name ())
308
- },
309
- }
310
-
311
- // If "direct" option is specified, do not cache the passed data on memory.
312
- // This option is useful for preventing memory cache from being polluted by data
313
- // that won't be accessed immediately.
314
- if dc .direct || opt .direct {
315
- return w , nil
316
- }
317
349
318
- b := dc .bufPool .Get ().(* bytes.Buffer )
319
- memW := & writer {
320
- WriteCloser : nopWriteCloser (io .Writer (b )),
321
- commitFunc : func () error {
322
- if dc .isClosed () {
323
- w .Close ()
324
- return fmt .Errorf ("cache is already closed" )
350
+ // Commit file
351
+ targetPath := BuildCachePath (dc .directory , key )
352
+ if err := os .MkdirAll (filepath .Dir (targetPath ), 0700 ); err != nil {
353
+ return fmt .Errorf ("failed to create cache directory: %w" , err )
325
354
}
326
- cached , done , added := dc . cache . Add ( key , b )
327
- if ! added {
328
- dc . putBuffer ( b ) // already exists in the cache. abort it.
355
+
356
+ if err := os . Rename ( w . Name (), targetPath ); err != nil {
357
+ return fmt . Errorf ( "failed to commit cache file: %w" , err )
329
358
}
330
- commit := func () error {
331
- defer done ()
332
- defer w .Close ()
333
- n , err := w .Write (cached .(* bytes.Buffer ).Bytes ())
334
- if err != nil || n != cached .(* bytes.Buffer ).Len () {
335
- w .Abort ()
336
- return err
359
+
360
+ // If hardlink manager exists and digest is provided, register the file
361
+ if dc .hlManager != nil && dc .hlManager .IsEnabled () && opt .chunkDigest != "" {
362
+ // Register this file as the primary source for this digest
363
+ if err := dc .hlManager .RegisterDigestFile (opt .chunkDigest , targetPath ); err != nil {
364
+ log .L .Debugf ("Failed to register digest file: %v" , err )
337
365
}
338
- return w .Commit ()
339
- }
340
- if dc .syncAdd {
341
- return commit ()
342
- }
343
- go func () {
344
- if err := commit (); err != nil {
345
- fmt .Println ("failed to commit to file:" , err )
366
+
367
+ // Map key to digest
368
+ internalKey := dc .hlManager .GenerateInternalKey (dc .directory , key )
369
+ if err := dc .hlManager .MapKeyToDigest (internalKey , opt .chunkDigest ); err != nil {
370
+ log .L .Debugf ("Failed to map key to digest: %v" , err )
346
371
}
347
- }()
372
+ }
373
+
348
374
return nil
349
375
},
350
376
abortFunc : func () error {
351
- defer w .Close ()
352
- defer w .Abort ()
353
- dc .putBuffer (b ) // abort it.
354
- return nil
377
+ return os .Remove (w .Name ())
355
378
},
356
379
}
357
380
358
- return memW , nil
381
+ // Return directly if in direct mode
382
+ if dc .direct || opt .direct {
383
+ return writer , nil
384
+ }
385
+
386
+ // Create memory cache
387
+ b := dc .bufPool .Get ().(* bytes.Buffer )
388
+ return dc .wrapMemoryWriter (b , writer , key )
359
389
}
360
390
361
391
func (dc * directoryCache ) putBuffer (b * bytes.Buffer ) {
@@ -380,14 +410,6 @@ func (dc *directoryCache) isClosed() bool {
380
410
return closed
381
411
}
382
412
383
- func (dc * directoryCache ) cachePath (key string ) string {
384
- return filepath .Join (dc .directory , key [:2 ], key )
385
- }
386
-
387
- func (dc * directoryCache ) wipFile (key string ) (* os.File , error ) {
388
- return os .CreateTemp (dc .wipDirectory , key + "-*" )
389
- }
390
-
391
413
func NewMemoryCache () BlobCache {
392
414
return & MemoryCache {
393
415
Membuf : map [string ]* bytes.Buffer {},
@@ -463,3 +485,50 @@ func (w *writeCloser) Close() error { return w.closeFunc() }
463
485
func nopWriteCloser (w io.Writer ) io.WriteCloser {
464
486
return & writeCloser {w , func () error { return nil }}
465
487
}
488
+
489
+ // wrapMemoryWriter wraps a writer with memory caching
490
+ func (dc * directoryCache ) wrapMemoryWriter (b * bytes.Buffer , w * writer , key string ) (Writer , error ) {
491
+ return & writer {
492
+ WriteCloser : nopWriteCloser (b ),
493
+ commitFunc : func () error {
494
+ if dc .isClosed () {
495
+ w .Close ()
496
+ return fmt .Errorf ("cache is already closed" )
497
+ }
498
+
499
+ cached , done , added := dc .cache .Add (key , b )
500
+ if ! added {
501
+ dc .putBuffer (b )
502
+ }
503
+
504
+ commit := func () error {
505
+ defer done ()
506
+ defer w .Close ()
507
+
508
+ n , err := w .Write (cached .(* bytes.Buffer ).Bytes ())
509
+ if err != nil || n != cached .(* bytes.Buffer ).Len () {
510
+ w .Abort ()
511
+ return err
512
+ }
513
+ return w .Commit ()
514
+ }
515
+
516
+ if dc .syncAdd {
517
+ return commit ()
518
+ }
519
+
520
+ go func () {
521
+ if err := commit (); err != nil {
522
+ log .L .Infof ("failed to commit to file: %v" , err )
523
+ }
524
+ }()
525
+ return nil
526
+ },
527
+ abortFunc : func () error {
528
+ defer w .Close ()
529
+ defer w .Abort ()
530
+ dc .putBuffer (b )
531
+ return nil
532
+ },
533
+ }, nil
534
+ }
0 commit comments