Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit b96cf48

Browse files
committedJan 22, 2024
unawareness prefetch implementation on snapshotter side
1. send post request to optimizer 2. store prefetchlist 3. add prefetchlist in nydusd
1 parent 2331e91 commit b96cf48

File tree

8 files changed

+133
-61
lines changed

8 files changed

+133
-61
lines changed
 

‎config/global.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type GlobalConfig struct {
3838
DaemonThreadsNum int
3939
CacheGCPeriod time.Duration
4040
MirrorsConfig MirrorsConfig
41+
PrefetchRoot string
4142
}
4243

4344
func IsFusedevSharedModeEnabled() bool {
@@ -64,6 +65,10 @@ func GetConfigRoot() string {
6465
return globalConfig.ConfigRoot
6566
}
6667

68+
func GetPrefetchRoot() string {
69+
return globalConfig.PrefetchRoot
70+
}
71+
6772
func GetMirrorsConfigDir() string {
6873
return globalConfig.MirrorsConfig.Dir
6974
}
@@ -181,6 +186,7 @@ func ProcessConfigurations(c *SnapshotterConfig) error {
181186
globalConfig.ConfigRoot = filepath.Join(c.Root, "config")
182187
globalConfig.SocketRoot = filepath.Join(c.Root, "socket")
183188
globalConfig.RootMountpoint = filepath.Join(c.Root, "mnt")
189+
globalConfig.PrefetchRoot = filepath.Join(c.Root, "prefetch")
184190

185191
globalConfig.MirrorsConfig = c.RemoteConfig.MirrorsConfig
186192

‎pkg/daemon/config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/containerd/nydus-snapshotter/config"
1818
"github.com/containerd/nydus-snapshotter/internal/constant"
19+
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
1920
)
2021

2122
// Build runtime nydusd daemon object, which might be persisted later
@@ -31,6 +32,20 @@ func WithSocketDir(dir string) NewDaemonOpt {
3132
}
3233
}
3334

35+
func WithPrefetchDir(dir, imageID string) NewDaemonOpt {
36+
return func(d *Daemon) error {
37+
s := filepath.Join(dir, d.ID())
38+
prefetchDir, err := prefetch.GetPrefetchList(s, imageID)
39+
if err != nil && !errors.Is(err, prefetch.ErrUds) {
40+
return errors.Wrapf(err, "failed to get prefetchList for image %s in path %s", imageID, s)
41+
}
42+
if prefetchDir != "" {
43+
d.States.PrefetchDir = prefetchDir
44+
}
45+
return nil
46+
}
47+
}
48+
3449
func WithRef(ref int32) NewDaemonOpt {
3550
return func(d *Daemon) error {
3651
d.ref = ref

‎pkg/daemon/daemon.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ type ConfigState struct {
5454
SupervisorPath string
5555
ThreadNum int
5656
// Where the configuration file resides, all rafs instances share the same configuration template
57-
ConfigDir string
57+
ConfigDir string
58+
PrefetchDir string
5859
}
5960

6061
// TODO: Record queried nydusd state

‎pkg/filesystem/fs.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
285285
if err != nil {
286286
return err
287287
}
288-
d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0)
288+
d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0, imageID)
289289
// if daemon already exists for snapshotID, just return
290290
if err != nil && !errdefs.IsAlreadyExists(err) {
291291
return err
@@ -578,7 +578,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {
578578
return errors.Errorf("got null mountpoint for fsDriver %s", fsManager.FsDriver)
579579
}
580580

581-
d, err := fs.createDaemon(fsManager, daemonMode, mp, 0)
581+
d, err := fs.createDaemon(fsManager, daemonMode, mp, 0, "")
582582
if err != nil {
583583
return errors.Wrap(err, "initialize shared daemon")
584584
}
@@ -612,7 +612,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {
612612

613613
// createDaemon create new nydus daemon by snapshotID and imageID
614614
func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config.DaemonMode,
615-
mountpoint string, ref int32) (d *daemon.Daemon, err error) {
615+
mountpoint string, ref int32, imageID string) (d *daemon.Daemon, err error) {
616616
opts := []daemon.NewDaemonOpt{
617617
daemon.WithRef(ref),
618618
daemon.WithSocketDir(config.GetSocketRoot()),
@@ -631,6 +631,10 @@ func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config
631631
opts = append(opts, daemon.WithMountpoint(mountpoint))
632632
}
633633

634+
if imageID != "" {
635+
opts = append(opts, daemon.WithPrefetchDir(config.GetPrefetchRoot(), imageID))
636+
}
637+
634638
d, err = daemon.NewDaemon(opts...)
635639
if err != nil {
636640
return nil, errors.Wrapf(err, "new daemon")

‎pkg/manager/daemon_adaptor.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
2424
"github.com/containerd/nydus-snapshotter/pkg/metrics/collector"
2525
metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool"
26-
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
2726
)
2827

2928
const endpointGetBackend string = "/api/v1/daemons/%s/backend"
@@ -122,7 +121,7 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
122121
// Build commandline according to nydusd daemon configuration.
123122
func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) (*exec.Cmd, error) {
124123
var cmdOpts []command.Opt
125-
var imageReference string
124+
// var imageReference string
126125

127126
nydusdThreadNum := d.NydusdThreadNum()
128127

@@ -148,7 +147,7 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
148147
return nil, errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
149148
}
150149

151-
imageReference = rafs.ImageID
150+
// imageReference = rafs.ImageID
152151

153152
bootstrap, err := rafs.BootstrapFile()
154153
if err != nil {
@@ -176,12 +175,8 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
176175
command.WithID(d.ID()))
177176
}
178177

179-
if imageReference != "" {
180-
prefetchfiles := prefetch.Pm.GetPrefetchInfo(imageReference)
181-
if prefetchfiles != "" {
182-
cmdOpts = append(cmdOpts, command.WithPrefetchFiles(prefetchfiles))
183-
prefetch.Pm.DeleteFromPrefetchMap(imageReference)
184-
}
178+
if d.States.PrefetchDir != "" {
179+
cmdOpts = append(cmdOpts, command.WithPrefetchFiles(d.States.PrefetchDir))
185180
}
186181

187182
cmdOpts = append(cmdOpts,

‎pkg/manager/manager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,10 @@ func (m *Manager) cleanUpDaemonResources(d *daemon.Daemon) {
287287
resource := []string{d.States.ConfigDir, d.States.LogDir}
288288
if !d.IsSharedDaemon() {
289289
socketDir := path.Dir(d.GetAPISock())
290+
if d.States.PrefetchDir != "" {
291+
prefetchDir := path.Dir(d.States.PrefetchDir)
292+
resource = append(resource, prefetchDir)
293+
}
290294
resource = append(resource, socketDir)
291295
}
292296

‎pkg/prefetch/prefetch.go

Lines changed: 95 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,54 +7,119 @@
77
package prefetch
88

99
import (
10+
"context"
1011
"encoding/json"
11-
"sync"
12+
"fmt"
13+
"io"
14+
"net"
15+
"net/http"
16+
"os"
17+
"path/filepath"
18+
"strings"
1219

1320
"github.com/containerd/containerd/log"
21+
"github.com/pkg/errors"
1422
)
1523

16-
type prefetchInfo struct {
17-
prefetchMap map[string]string
18-
prefetchMutex sync.Mutex
24+
type prefetchlist struct {
25+
FilePaths []string `json:"files"`
1926
}
2027

21-
var Pm prefetchInfo
28+
const (
29+
endpointPrefetch = "/api/v1/imagename"
30+
udsSocket = "/tmp/prefetch.sock"
31+
)
32+
33+
var ErrUds = errors.New("failed to connect unix domain socket")
2234

23-
func (p *prefetchInfo) SetPrefetchFiles(body []byte) error {
24-
p.prefetchMutex.Lock()
25-
defer p.prefetchMutex.Unlock()
35+
func GetPrefetchList(prefetchDir, imageRepo string) (string, error) {
36+
url := fmt.Sprintf("http://unix%s", endpointPrefetch)
2637

27-
var prefetchMsg []map[string]string
28-
if err := json.Unmarshal(body, &prefetchMsg); err != nil {
29-
return err
38+
req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(imageRepo))
39+
if err != nil {
40+
return "", err
3041
}
3142

32-
if p.prefetchMap == nil {
33-
p.prefetchMap = make(map[string]string)
43+
client := &http.Client{
44+
Transport: &http.Transport{
45+
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
46+
return net.Dial("unix", udsSocket)
47+
},
48+
},
3449
}
35-
for _, item := range prefetchMsg {
36-
image := item["image"]
37-
prefetchfiles := item["prefetch"]
38-
p.prefetchMap[image] = prefetchfiles
50+
resp, err := client.Do(req)
51+
if err != nil {
52+
log.L.Infof("failed to connect unix domain socket. Skipping prefetch for image: %s\n", imageRepo)
53+
return "", ErrUds
3954
}
55+
defer resp.Body.Close()
4056

41-
log.L.Infof("received prefetch list from nri plugin: %v ", p.prefetchMap)
42-
return nil
43-
}
57+
if resp.StatusCode != http.StatusOK {
58+
return "", fmt.Errorf("failed to send data, status code: %v", resp.StatusCode)
59+
}
4460

45-
func (p *prefetchInfo) GetPrefetchInfo(image string) string {
46-
p.prefetchMutex.Lock()
47-
defer p.prefetchMutex.Unlock()
61+
body, err := io.ReadAll(resp.Body)
62+
if err != nil {
63+
return "", err
64+
}
4865

49-
if prefetchfiles, ok := p.prefetchMap[image]; ok {
50-
return prefetchfiles
66+
if strings.Contains(string(body), "CacheItem not found") {
67+
log.L.Infof("Cache item not found for image: %s\n", imageRepo)
68+
return "", nil
5169
}
52-
return ""
70+
71+
prefetchfilePath, err := storePrefetchList(prefetchDir, body)
72+
if err != nil {
73+
return "", err
74+
}
75+
return prefetchfilePath, nil
5376
}
5477

55-
func (p *prefetchInfo) DeleteFromPrefetchMap(image string) {
56-
p.prefetchMutex.Lock()
57-
defer p.prefetchMutex.Unlock()
78+
func storePrefetchList(prefetchDir string, list []byte) (string, error) {
79+
if err := os.MkdirAll(prefetchDir, 0755); err != nil {
80+
return "", errors.Wrapf(err, "create prefetch dir %s", prefetchDir)
81+
}
82+
83+
filePath := filepath.Join(prefetchDir, "prefetchList")
84+
jsonfilePath := filepath.Join(prefetchDir, "prefetchList.json")
85+
86+
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
87+
if err != nil {
88+
fmt.Println("Error opening file:", err)
89+
return "", errors.Wrap(err, "error opening prefetch file")
90+
}
91+
defer file.Close()
92+
93+
var prefetchSlice []string
94+
err = json.Unmarshal(list, &prefetchSlice)
95+
if err != nil {
96+
return "", errors.Wrap(err, "failed to parse prefetch list")
97+
}
98+
99+
for _, path := range prefetchSlice {
100+
content := path + "\n"
101+
_, err := file.WriteString(content)
102+
if err != nil {
103+
return "", errors.Wrap(err, "error writing to prefetch file")
104+
}
105+
}
106+
107+
prefetchStruct := prefetchlist{FilePaths: prefetchSlice}
108+
jsonByte, err := json.Marshal(prefetchStruct)
109+
if err != nil {
110+
return "", errors.Wrap(err, "failed to marshal to JSON")
111+
}
112+
113+
jsonfile, err := os.Create(jsonfilePath)
114+
if err != nil {
115+
return "", errors.Wrapf(err, "failed to create file %s", jsonfilePath)
116+
}
117+
defer jsonfile.Close()
118+
119+
_, err = jsonfile.Write(jsonByte)
120+
if err != nil {
121+
return "", errors.Wrap(err, "error writing JSON to file")
122+
}
58123

59-
delete(p.prefetchMap, image)
124+
return filePath, nil
60125
}

‎pkg/system/system.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ package system
99
import (
1010
"encoding/json"
1111
"fmt"
12-
"io"
1312
"net"
1413
"net/http"
1514
"os"
@@ -30,7 +29,6 @@ import (
3029
"github.com/containerd/nydus-snapshotter/pkg/filesystem"
3130
"github.com/containerd/nydus-snapshotter/pkg/manager"
3231
metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool"
33-
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
3432
)
3533

3634
const (
@@ -41,7 +39,6 @@ const (
4139
// it's very helpful to check daemon's record in database.
4240
endpointDaemonRecords string = "/api/v1/daemons/records"
4341
endpointDaemonsUpgrade string = "/api/v1/daemons/upgrade"
44-
endpointPrefetch string = "/api/v1/prefetch"
4542
// Provide backend information
4643
endpointGetBackend string = "/api/v1/daemons/{id}/backend"
4744
)
@@ -172,7 +169,6 @@ func (sc *Controller) registerRouter() {
172169
sc.router.HandleFunc(endpointDaemons, sc.describeDaemons()).Methods(http.MethodGet)
173170
sc.router.HandleFunc(endpointDaemonsUpgrade, sc.upgradeDaemons()).Methods(http.MethodPut)
174171
sc.router.HandleFunc(endpointDaemonRecords, sc.getDaemonRecords()).Methods(http.MethodGet)
175-
sc.router.HandleFunc(endpointPrefetch, sc.setPrefetchConfiguration()).Methods(http.MethodPut)
176172
sc.router.HandleFunc(endpointGetBackend, sc.getBackend()).Methods(http.MethodGet)
177173
}
178174

@@ -216,20 +212,6 @@ func (sc *Controller) getBackend() func(w http.ResponseWriter, r *http.Request)
216212
}
217213
}
218214

219-
func (sc *Controller) setPrefetchConfiguration() func(w http.ResponseWriter, r *http.Request) {
220-
return func(w http.ResponseWriter, r *http.Request) {
221-
body, err := io.ReadAll(r.Body)
222-
if err != nil {
223-
log.L.Errorf("Failed to read prefetch list: %v", err)
224-
return
225-
}
226-
if err = prefetch.Pm.SetPrefetchFiles(body); err != nil {
227-
log.L.Errorf("Failed to parse request body: %v", err)
228-
return
229-
}
230-
}
231-
}
232-
233215
func (sc *Controller) describeDaemons() func(w http.ResponseWriter, r *http.Request) {
234216
return func(w http.ResponseWriter, r *http.Request) {
235217
info := make([]daemonInfo, 0, 10)

0 commit comments

Comments
 (0)
Please sign in to comment.