Skip to content

Commit d5cc06a

Browse files
authored
Merge pull request #55 from gfanton/feat/fix-fetching
2 parents 9767abc + 1cbc1c4 commit d5cc06a

File tree

7 files changed

+399
-276
lines changed

7 files changed

+399
-276
lines changed

entry/entry_io.go

Lines changed: 6 additions & 243 deletions
Original file line numberDiff line numberDiff line change
@@ -2,261 +2,24 @@ package entry // import "berty.tech/go-ipfs-log/entry"
22

33
import (
44
"context"
5-
"sync"
6-
"time"
75

86
"github.com/ipfs/go-cid"
97
core_iface "github.com/ipfs/interface-go-ipfs-core"
108

119
"berty.tech/go-ipfs-log/iface"
12-
"berty.tech/go-ipfs-log/io/cbor"
1310
)
1411

1512
type FetchOptions = iface.FetchOptions
1613

17-
// FetchParallel retrieves IPFS log entries.
14+
// FetchParallel has the same comportement than FetchAll, we keep it for retrop
15+
// compatibility purpose
1816
func FetchParallel(ctx context.Context, ipfs core_iface.CoreAPI, hashes []cid.Cid, options *FetchOptions) []iface.IPFSLogEntry {
19-
var (
20-
entries = []iface.IPFSLogEntry(nil)
21-
fetchedEntries = make([][]iface.IPFSLogEntry, len(hashes))
22-
wg = sync.WaitGroup{}
23-
)
24-
25-
if options.IO == nil {
26-
return nil
27-
}
28-
29-
wg.Add(len(hashes))
30-
31-
for i, h := range hashes {
32-
go func(h cid.Cid, i int) {
33-
defer wg.Done()
34-
35-
fetchedEntries[i] = FetchAll(ctx, ipfs, []cid.Cid{h}, options)
36-
}(h, i)
37-
}
38-
39-
wg.Wait()
40-
41-
for i := range hashes {
42-
entries = append(entries, fetchedEntries[i]...)
43-
}
44-
45-
return entries
17+
fetcher := NewFetcher(ipfs, options)
18+
return fetcher.Fetch(ctx, hashes)
4619
}
4720

4821
// FetchAll gets entries from their CIDs.
4922
func FetchAll(ctx context.Context, ipfs core_iface.CoreAPI, hashes []cid.Cid, options *FetchOptions) []iface.IPFSLogEntry {
50-
if options.IO == nil {
51-
io, err := cbor.IO(&Entry{}, &LamportClock{})
52-
if err != nil {
53-
return nil
54-
}
55-
56-
options.IO = io
57-
}
58-
59-
var (
60-
lock = sync.Mutex{}
61-
result = []iface.IPFSLogEntry(nil)
62-
cache = map[cid.Cid]bool{}
63-
loadingCache = map[cid.Cid]bool{}
64-
loadingQueue = map[int][]cid.Cid{0: hashes}
65-
running = 0 // keep track of how many entries are being fetched at any time
66-
maxClock = 0 // keep track of the latest clock time during load
67-
minClock = 0 // keep track of the minimum clock time during load
68-
concurrency = 1
69-
length = -1
70-
)
71-
72-
if options.Length != nil {
73-
length = *options.Length
74-
}
75-
76-
if options.Concurrency > concurrency {
77-
concurrency = options.Concurrency
78-
}
79-
80-
ctx, cancel := context.WithCancel(ctx)
81-
if options.Timeout > 0 {
82-
ctx, cancel = context.WithTimeout(context.Background(), time.Second*options.Timeout)
83-
}
84-
85-
defer cancel()
86-
87-
// Add a multihash to the loading queue
88-
addToLoadingQueue := func(e cid.Cid, idx int) {
89-
lock.Lock()
90-
defer lock.Unlock()
91-
92-
if _, ok := loadingCache[e]; ok {
93-
return
94-
}
95-
96-
loadingCache[e] = true
97-
98-
for _, otherE := range loadingQueue[idx] {
99-
if otherE.Equals(e) {
100-
return
101-
}
102-
}
103-
104-
loadingQueue[idx] = append(loadingQueue[idx], e)
105-
}
106-
107-
// Get the next items to process from the loading queue
108-
getNextFromQueue := func(length int) []cid.Cid {
109-
lock.Lock()
110-
defer lock.Unlock()
111-
112-
if length == 0 {
113-
length = 1
114-
}
115-
116-
res := []cid.Cid(nil)
117-
118-
for key := range loadingQueue {
119-
nextItems := loadingQueue[key]
120-
for len(nextItems) > 0 && len(res) < length {
121-
h := nextItems[0]
122-
nextItems = nextItems[1:]
123-
124-
res = append(res, h)
125-
}
126-
127-
loadingQueue[key] = nextItems
128-
129-
if len(nextItems) == 0 {
130-
delete(loadingQueue, key)
131-
}
132-
}
133-
134-
return res
135-
}
136-
137-
// Fetch one entry and add it to the results
138-
fetchEntry := func(hash cid.Cid) {
139-
if !hash.Defined() {
140-
return
141-
}
142-
143-
if _, ok := cache[hash]; ok {
144-
return
145-
}
146-
147-
addToResults := func(entry iface.IPFSLogEntry) {
148-
if !entry.IsValid() {
149-
return
150-
}
151-
152-
ts := entry.GetClock().GetTime()
153-
154-
// Update min/max clocks
155-
if maxClock < ts {
156-
maxClock = ts
157-
}
158-
159-
if len(result) > 0 {
160-
if ts := result[len(result)-1].GetClock().GetTime(); ts < minClock {
161-
minClock = ts
162-
}
163-
} else {
164-
minClock = maxClock
165-
}
166-
167-
isLater := len(result) >= length && ts >= minClock
168-
// const calculateIndex = (idx) => maxClock - ts + ((idx + 1) * idx)
169-
170-
// Add the entry to the results if
171-
// 1) we're fetching all entries
172-
// 2) results is not filled yet
173-
// the clock of the entry is later than current known minimum clock time
174-
if length < 0 || len(result) < length || isLater {
175-
result = append(result, entry)
176-
cache[hash] = true
177-
178-
if options.ProgressChan != nil {
179-
options.ProgressChan <- entry
180-
}
181-
182-
}
183-
184-
if length < 0 {
185-
// If we're fetching all entries (length === -1), adds nexts and refs to the queue
186-
for i, h := range entry.GetNext() {
187-
addToLoadingQueue(h, i)
188-
}
189-
190-
for i, h := range entry.GetRefs() {
191-
addToLoadingQueue(h, i)
192-
}
193-
} else {
194-
// If we're fetching entries up to certain length,
195-
// fetch the next if result is filled up, to make sure we "check"
196-
// the next entry if its clock is later than what we have in the result
197-
if _, ok := cache[entry.GetHash()]; len(result) < length || ts > minClock || ts == minClock && !ok {
198-
for _, h := range entry.GetNext() {
199-
addToLoadingQueue(h, maxClock-ts)
200-
}
201-
}
202-
if len(result)+len(entry.GetRefs()) <= length {
203-
for i, h := range entry.GetRefs() {
204-
addToLoadingQueue(h, maxClock-ts+((i+1)*i))
205-
}
206-
}
207-
}
208-
}
209-
210-
// Load the entry
211-
entry, err := FromMultihashWithIO(ctx, ipfs, hash, options.Provider, options.IO)
212-
if err != nil {
213-
// TODO: log
214-
return
215-
}
216-
217-
// Add it to the results
218-
addToResults(entry)
219-
}
220-
221-
// Add entries to exclude from processing to the cache before we start
222-
// Add entries that we don't need to fetch to the "cache"
223-
for _, e := range options.Exclude {
224-
cache[e.GetHash()] = true
225-
}
226-
227-
loadingQueueHasItems := func() bool {
228-
for _, s := range loadingQueue {
229-
if len(s) > 0 {
230-
return true
231-
}
232-
}
233-
234-
return false
235-
}
236-
237-
go func() {
238-
// Does the loading queue have more to process?
239-
for loadingQueueHasItems() {
240-
if ctx.Err() != nil {
241-
break
242-
}
243-
244-
if running < concurrency {
245-
nexts := getNextFromQueue(concurrency)
246-
running += len(nexts)
247-
for _, n := range nexts {
248-
fetchEntry(n)
249-
}
250-
251-
running -= len(nexts)
252-
}
253-
}
254-
cancel()
255-
}()
256-
257-
// Resolve the promise after a timeout (if given) in order to
258-
// not get stuck loading a block that is unreachable
259-
<-ctx.Done()
260-
261-
return result
23+
fetcher := NewFetcher(ipfs, options)
24+
return fetcher.Fetch(ctx, hashes)
26225
}

0 commit comments

Comments
 (0)