@@ -18,13 +18,13 @@ package cache
18
18
19
19
import (
20
20
"bytes"
21
- "errors"
22
21
"fmt"
23
22
"io"
24
23
"os"
25
24
"path/filepath"
26
25
"sync"
27
26
27
+ "github.com/containerd/log"
28
28
"github.com/containerd/stargz-snapshotter/util/cacheutil"
29
29
"github.com/containerd/stargz-snapshotter/util/namedmutex"
30
30
)
@@ -61,6 +61,9 @@ type DirectoryCacheConfig struct {
61
61
// Direct forcefully enables direct mode for all operation in cache.
62
62
// Thus operation won't use on-memory caches.
63
63
Direct bool
64
+
65
+ // EnableHardlink enables hardlinking of cache files to reduce memory usage
66
+ EnableHardlink bool
64
67
}
65
68
66
69
// TODO: contents validation.
@@ -99,6 +102,7 @@ type Writer interface {
99
102
type cacheOpt struct {
100
103
direct bool
101
104
passThrough bool
105
+ chunkDigest string
102
106
}
103
107
104
108
type Option func (o * cacheOpt ) * cacheOpt
@@ -123,6 +127,14 @@ func PassThrough() Option {
123
127
}
124
128
}
125
129
130
+ // ChunkDigest option allows specifying a chunk digest for the cache
131
+ func ChunkDigest (digest string ) Option {
132
+ return func (o * cacheOpt ) * cacheOpt {
133
+ o .chunkDigest = digest
134
+ return o
135
+ }
136
+ }
137
+
126
138
func NewDirectoryCache (directory string , config DirectoryCacheConfig ) (BlobCache , error ) {
127
139
if ! filepath .IsAbs (directory ) {
128
140
return nil , fmt .Errorf ("dir cache path must be an absolute path; got %q" , directory )
@@ -166,15 +178,24 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
166
178
return nil , err
167
179
}
168
180
dc := & directoryCache {
169
- cache : dataCache ,
170
- fileCache : fdCache ,
171
- wipLock : new (namedmutex.NamedMutex ),
172
- directory : directory ,
173
- wipDirectory : wipdir ,
174
- bufPool : bufPool ,
175
- direct : config .Direct ,
181
+ cache : dataCache ,
182
+ fileCache : fdCache ,
183
+ wipLock : new (namedmutex.NamedMutex ),
184
+ directory : directory ,
185
+ wipDirectory : wipdir ,
186
+ bufPool : bufPool ,
187
+ direct : config .Direct ,
188
+ enableHardlink : config .EnableHardlink ,
189
+ syncAdd : config .SyncAdd ,
190
+ }
191
+
192
+ // Initialize hardlink manager if enabled
193
+ if config .EnableHardlink {
194
+ hlManager , enabled := InitializeHardlinkManager (filepath .Dir (filepath .Dir (directory )), config .EnableHardlink )
195
+ dc .hlManager = hlManager
196
+ dc .enableHardlink = enabled
176
197
}
177
- dc . syncAdd = config . SyncAdd
198
+
178
199
return dc , nil
179
200
}
180
201
@@ -193,6 +214,9 @@ type directoryCache struct {
193
214
194
215
closed bool
195
216
closedMu sync.Mutex
217
+
218
+ enableHardlink bool
219
+ hlManager * HardlinkManager
196
220
}
197
221
198
222
func (dc * directoryCache ) Get (key string , opts ... Option ) (Reader , error ) {
@@ -205,9 +229,15 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
205
229
opt = o (opt )
206
230
}
207
231
232
+ // Try to get from memory cache
208
233
if ! dc .direct && ! opt .direct {
209
- // Get data from memory
210
- if b , done , ok := dc .cache .Get (key ); ok {
234
+ // Try memory cache for digest or key
235
+ cacheKey := key
236
+ if dc .hlManager != nil && dc .hlManager .IsEnabled () && opt .chunkDigest != "" {
237
+ cacheKey = opt .chunkDigest
238
+ }
239
+
240
+ if b , done , ok := dc .cache .Get (cacheKey ); ok {
211
241
return & reader {
212
242
ReaderAt : bytes .NewReader (b .(* bytes.Buffer ).Bytes ()),
213
243
closeFunc : func () error {
@@ -217,8 +247,8 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
217
247
}, nil
218
248
}
219
249
220
- // Get data from disk. If the file is already opened, use it.
221
- if f , done , ok := dc .fileCache .Get (key ); ok {
250
+ // Get data from file cache for digest or key
251
+ if f , done , ok := dc .fileCache .Get (cacheKey ); ok {
222
252
return & reader {
223
253
ReaderAt : f .(* os.File ),
224
254
closeFunc : func () error {
@@ -229,10 +259,21 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
229
259
}
230
260
}
231
261
262
+ // First try regular file path
263
+ filepath := BuildCachePath (dc .directory , key )
264
+
265
+ // Check hardlink manager for existing digest file
266
+ if dc .hlManager != nil && opt .chunkDigest != "" {
267
+ if digestPath , exists := dc .hlManager .ProcessCacheGet (key , opt .chunkDigest , opt .direct ); exists {
268
+ log .L .Debugf ("Using existing file for digest %q instead of key %q" , opt .chunkDigest , key )
269
+ filepath = digestPath
270
+ }
271
+ }
272
+
232
273
// Open the cache file and read the target region
233
274
// TODO: If the target cache is write-in-progress, should we wait for the completion
234
275
// or simply report the cache miss?
235
- file , err := os .Open (dc . cachePath ( key ) )
276
+ file , err := os .Open (filepath )
236
277
if err != nil {
237
278
return nil , fmt .Errorf ("failed to open blob file for %q: %w" , key , err )
238
279
}
@@ -261,7 +302,12 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
261
302
return & reader {
262
303
ReaderAt : file ,
263
304
closeFunc : func () error {
264
- _ , done , added := dc .fileCache .Add (key , file )
305
+ cacheKey := key
306
+ if dc .hlManager != nil && dc .hlManager .IsEnabled () && opt .chunkDigest != "" {
307
+ cacheKey = opt .chunkDigest
308
+ }
309
+
310
+ _ , done , added := dc .fileCache .Add (cacheKey , file )
265
311
defer done () // Release it immediately. Cleaned up on eviction.
266
312
if ! added {
267
313
return file .Close () // file already exists in the cache. close it.
@@ -281,81 +327,76 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
281
327
opt = o (opt )
282
328
}
283
329
284
- wip , err := dc .wipFile (key )
330
+ // If hardlink manager exists and digest is provided, check if a hardlink can be created
331
+ if dc .hlManager != nil && opt .chunkDigest != "" {
332
+ keyPath := BuildCachePath (dc .directory , key )
333
+
334
+ // Try to create a hardlink from existing digest file
335
+ if dc .hlManager .ProcessCacheAdd (key , opt .chunkDigest , keyPath ) {
336
+ // Return a no-op writer since the file already exists
337
+ return & writer {
338
+ WriteCloser : nopWriteCloser (io .Discard ),
339
+ commitFunc : func () error { return nil },
340
+ abortFunc : func () error { return nil },
341
+ }, nil
342
+ }
343
+ }
344
+
345
+ // Create temporary file
346
+ w , err := WipFile (dc .wipDirectory , key )
285
347
if err != nil {
286
348
return nil , err
287
349
}
288
- w := & writer {
289
- WriteCloser : wip ,
350
+
351
+ // Create writer
352
+ writer := & writer {
353
+ WriteCloser : w ,
290
354
commitFunc : func () error {
291
355
if dc .isClosed () {
292
356
return fmt .Errorf ("cache is already closed" )
293
357
}
294
- // Commit the cache contents
295
- c := dc .cachePath (key )
296
- if err := os .MkdirAll (filepath .Dir (c ), os .ModePerm ); err != nil {
297
- var errs []error
298
- if err := os .Remove (wip .Name ()); err != nil {
299
- errs = append (errs , err )
358
+
359
+ // Commit file
360
+ targetPath := BuildCachePath (dc .directory , key )
361
+ if err := os .MkdirAll (filepath .Dir (targetPath ), 0700 ); err != nil {
362
+ return fmt .Errorf ("failed to create cache directory: %w" , err )
363
+ }
364
+
365
+ if err := os .Rename (w .Name (), targetPath ); err != nil {
366
+ return fmt .Errorf ("failed to commit cache file: %w" , err )
367
+ }
368
+
369
+ // If hardlink manager exists and digest is provided, register the file
370
+ if dc .hlManager != nil && dc .hlManager .IsEnabled () && opt .chunkDigest != "" {
371
+ // Register this file as the primary source for this digest
372
+ if err := dc .hlManager .RegisterDigestFile (opt .chunkDigest , targetPath ); err != nil {
373
+ log .L .Debugf ("Failed to register digest file: %v" , err )
374
+ }
375
+
376
+ // Map key to digest
377
+ internalKey := dc .hlManager .GenerateInternalKey (dc .directory , key )
378
+ if err := dc .hlManager .MapKeyToDigest (internalKey , opt .chunkDigest ); err != nil {
379
+ log .L .Debugf ("Failed to map key to digest: %v" , err )
300
380
}
301
- errs = append (errs , fmt .Errorf ("failed to create cache directory %q: %w" , c , err ))
302
- return errors .Join (errs ... )
303
381
}
304
- return os .Rename (wip .Name (), c )
382
+
383
+ return nil
305
384
},
306
385
abortFunc : func () error {
307
- return os .Remove (wip .Name ())
386
+ return os .Remove (w .Name ())
308
387
},
309
388
}
310
389
311
390
// If "direct" option is specified, do not cache the passed data on memory.
312
391
// This option is useful for preventing memory cache from being polluted by data
313
392
// that won't be accessed immediately.
314
393
if dc .direct || opt .direct {
315
- return w , nil
394
+ return writer , nil
316
395
}
317
396
397
+ // Create memory cache
318
398
b := dc .bufPool .Get ().(* bytes.Buffer )
319
- memW := & writer {
320
- WriteCloser : nopWriteCloser (io .Writer (b )),
321
- commitFunc : func () error {
322
- if dc .isClosed () {
323
- w .Close ()
324
- return fmt .Errorf ("cache is already closed" )
325
- }
326
- cached , done , added := dc .cache .Add (key , b )
327
- if ! added {
328
- dc .putBuffer (b ) // already exists in the cache. abort it.
329
- }
330
- commit := func () error {
331
- defer done ()
332
- defer w .Close ()
333
- n , err := w .Write (cached .(* bytes.Buffer ).Bytes ())
334
- if err != nil || n != cached .(* bytes.Buffer ).Len () {
335
- w .Abort ()
336
- return err
337
- }
338
- return w .Commit ()
339
- }
340
- if dc .syncAdd {
341
- return commit ()
342
- }
343
- go func () {
344
- if err := commit (); err != nil {
345
- fmt .Println ("failed to commit to file:" , err )
346
- }
347
- }()
348
- return nil
349
- },
350
- abortFunc : func () error {
351
- defer w .Close ()
352
- defer w .Abort ()
353
- dc .putBuffer (b ) // abort it.
354
- return nil
355
- },
356
- }
357
-
358
- return memW , nil
399
+ return dc .wrapMemoryWriter (b , writer , key )
359
400
}
360
401
361
402
func (dc * directoryCache ) putBuffer (b * bytes.Buffer ) {
@@ -380,14 +421,6 @@ func (dc *directoryCache) isClosed() bool {
380
421
return closed
381
422
}
382
423
383
- func (dc * directoryCache ) cachePath (key string ) string {
384
- return filepath .Join (dc .directory , key [:2 ], key )
385
- }
386
-
387
- func (dc * directoryCache ) wipFile (key string ) (* os.File , error ) {
388
- return os .CreateTemp (dc .wipDirectory , key + "-*" )
389
- }
390
-
391
424
func NewMemoryCache () BlobCache {
392
425
return & MemoryCache {
393
426
Membuf : map [string ]* bytes.Buffer {},
@@ -463,3 +496,50 @@ func (w *writeCloser) Close() error { return w.closeFunc() }
463
496
func nopWriteCloser (w io.Writer ) io.WriteCloser {
464
497
return & writeCloser {w , func () error { return nil }}
465
498
}
499
+
500
+ // wrapMemoryWriter wraps a writer with memory caching
501
+ func (dc * directoryCache ) wrapMemoryWriter (b * bytes.Buffer , w * writer , key string ) (Writer , error ) {
502
+ return & writer {
503
+ WriteCloser : nopWriteCloser (b ),
504
+ commitFunc : func () error {
505
+ if dc .isClosed () {
506
+ w .Close ()
507
+ return fmt .Errorf ("cache is already closed" )
508
+ }
509
+
510
+ cached , done , added := dc .cache .Add (key , b )
511
+ if ! added {
512
+ dc .putBuffer (b )
513
+ }
514
+
515
+ commit := func () error {
516
+ defer done ()
517
+ defer w .Close ()
518
+
519
+ n , err := w .Write (cached .(* bytes.Buffer ).Bytes ())
520
+ if err != nil || n != cached .(* bytes.Buffer ).Len () {
521
+ w .Abort ()
522
+ return err
523
+ }
524
+ return w .Commit ()
525
+ }
526
+
527
+ if dc .syncAdd {
528
+ return commit ()
529
+ }
530
+
531
+ go func () {
532
+ if err := commit (); err != nil {
533
+ log .L .Infof ("failed to commit to file: %v" , err )
534
+ }
535
+ }()
536
+ return nil
537
+ },
538
+ abortFunc : func () error {
539
+ defer w .Close ()
540
+ defer w .Abort ()
541
+ dc .putBuffer (b )
542
+ return nil
543
+ },
544
+ }, nil
545
+ }
0 commit comments