Skip to content

Commit 612b081

Browse files
authored
Check archive block height before sending subscribers notification (#403)
* Fix block notification * Fix imports * Start notification feeder in test environment * Start feeder before block procesing * Stop notification feeder when closing test environment * Change order of subscription and adding txs to pool * Adjust correct epoch in test * Feeder scope is already closed in its Stop function
1 parent 1ecb788 commit 612b081

File tree

4 files changed

+107
-19
lines changed

4 files changed

+107
-19
lines changed

gossip/c_block_callbacks.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/ethereum/go-ethereum/log"
1919
"github.com/ethereum/go-ethereum/metrics"
2020

21-
"github.com/Fantom-foundation/go-opera/evmcore"
2221
"github.com/Fantom-foundation/go-opera/gossip/blockproc/verwatcher"
2322
"github.com/Fantom-foundation/go-opera/gossip/emitter"
2423
"github.com/Fantom-foundation/go-opera/gossip/evmstore"
@@ -398,14 +397,11 @@ func consensusCallbackBeginBlockFn(
398397

399398
// Notify about new block
400399
if feed != nil {
401-
feed.newBlock.Send(evmcore.ChainHeadNotify{Block: evmBlock})
402400
var logs []*types.Log
403401
for _, r := range allReceipts {
404-
for _, l := range r.Logs {
405-
logs = append(logs, l)
406-
}
402+
logs = append(logs, r.Logs...)
407403
}
408-
feed.newLogs.Send(logs)
404+
feed.notifyAboutNewBlock(evmBlock, logs)
409405
}
410406

411407
now := time.Now()

gossip/common_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,13 +197,15 @@ func newTestEnv(firstEpoch idx.Epoch, validatorsNum idx.Validator, tb testing.TB
197197
em.Start()
198198
}
199199

200+
env.feed.Start(store.evm)
200201
env.blockProcTasks.Start(1)
201202
env.verWatcher.Start()
202203

203204
return env
204205
}
205206

206207
func (env *testEnv) Close() {
208+
env.feed.Stop()
207209
env.verWatcher.Stop()
208210
env.store.Close()
209211
env.tflusher.Stop()
@@ -220,8 +222,6 @@ func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (ty
220222

221223
externalReceipts := make(types.Receipts, 0, len(txs))
222224

223-
env.txpool.AddRemotes(txs)
224-
defer env.txpool.(*dummyTxPool).Clear()
225225
newBlocks := make(chan evmcore.ChainHeadNotify)
226226
chainHeadSub := env.feed.SubscribeNewBlock(newBlocks)
227227
mu := &sync.Mutex{}
@@ -246,6 +246,9 @@ func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (ty
246246
}
247247
}
248248
}()
249+
env.txpool.AddRemotes(txs)
250+
defer env.txpool.(*dummyTxPool).Clear()
251+
249252
err := env.EmitUntil(func() bool {
250253
mu.Lock()
251254
defer mu.Unlock()

gossip/mps_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -575,18 +575,23 @@ func TestMisbehaviourProofsWrongBlockEpoch(t *testing.T) {
575575
require.ErrorIs(err, heavycheck.ErrUnknownEpochBVs)
576576

577577
goodEpochMp := copyMP(correctMp)
578-
goodEpochMp.WrongBlockVote.Pals[0].Val.Epoch = env.store.FindBlockEpoch(goodEpochMp.WrongBlockVote.Block)
579-
goodEpochMp.WrongBlockVote.Pals[1].Val.Epoch = env.store.FindBlockEpoch(goodEpochMp.WrongBlockVote.Block)
578+
// Get epoch number when vote will occur
579+
epoch := env.store.FindBlockEpoch(goodEpochMp.WrongBlockVote.Block)
580+
goodEpochMp.WrongBlockVote.Pals[0].Val.Epoch = epoch - 1
581+
goodEpochMp.WrongBlockVote.Pals[1].Val.Epoch = epoch - 1
580582
err = env.ApplyMPs(nextEpoch, goodEpochMp)
581583
require.ErrorIs(err, heavycheck.ErrWrongPayloadHash)
582584
sign(&goodEpochMp)
583585
err = env.ApplyMPs(nextEpoch, goodEpochMp)
584586
require.NoError(err)
585-
require.Equal(idx.Validator(3), env.store.GetValidators().Len())
587+
// Get epoch state, before validators penalty
588+
epochState := env.store.GetHistoryEpochState(epoch)
589+
require.Equal(idx.Validator(3), epochState.Validators.Len())
586590

587591
err = env.ApplyMPs(nextEpoch, correctMp)
588592
require.NoError(err)
589-
require.Equal(idx.Validator(1), env.store.GetValidators().Len())
593+
epochState = env.store.GetHistoryEpochState(epoch + 1)
594+
require.Equal(idx.Validator(1), epochState.Validators.Len())
590595
require.False(env.store.GetValidators().Exists(1))
591596
require.False(env.store.GetValidators().Exists(2))
592597
}

gossip/service.go

Lines changed: 91 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"math/big"
77
"math/rand"
8+
"slices"
89
"sync"
910
"sync/atomic"
1011
"time"
@@ -41,6 +42,7 @@ import (
4142
"github.com/Fantom-foundation/go-opera/gossip/blockproc/sealmodule"
4243
"github.com/Fantom-foundation/go-opera/gossip/blockproc/verwatcher"
4344
"github.com/Fantom-foundation/go-opera/gossip/emitter"
45+
"github.com/Fantom-foundation/go-opera/gossip/evmstore"
4446
"github.com/Fantom-foundation/go-opera/gossip/filters"
4547
"github.com/Fantom-foundation/go-opera/gossip/gasprice"
4648
"github.com/Fantom-foundation/go-opera/gossip/proclogger"
@@ -60,6 +62,15 @@ type ServiceFeed struct {
6062
newEmittedEvent notify.Feed
6163
newBlock notify.Feed
6264
newLogs notify.Feed
65+
66+
incomingUpdates chan<- feedUpdate // < channel to send updates to the background feed loop
67+
stopFeeder chan<- struct{} // < if closed, the background feed loop will stop
68+
feederDone <-chan struct{} // < if closed, the background feed loop has stopped
69+
}
70+
71+
type feedUpdate struct {
72+
block *evmcore.EvmBlock
73+
logs []*types.Log
6374
}
6475

6576
func (f *ServiceFeed) SubscribeNewEpoch(ch chan<- idx.Epoch) notify.Subscription {
@@ -78,6 +89,77 @@ func (f *ServiceFeed) SubscribeNewLogs(ch chan<- []*types.Log) notify.Subscripti
7889
return f.scope.Track(f.newLogs.Subscribe(ch))
7990
}
8091

92+
func (f *ServiceFeed) Start(store *evmstore.Store) {
93+
incoming := make(chan feedUpdate, 1024)
94+
f.incomingUpdates = incoming
95+
stop := make(chan struct{})
96+
done := make(chan struct{})
97+
f.stopFeeder = stop
98+
f.feederDone = done
99+
go func() {
100+
defer close(done)
101+
ticker := time.NewTicker(10 * time.Millisecond)
102+
defer ticker.Stop()
103+
pending := []feedUpdate{}
104+
for {
105+
select {
106+
case <-stop:
107+
return
108+
case update := <-incoming:
109+
pending = append(pending, update)
110+
// sorting could be replaced by a heap or skipped if updates
111+
// are guaranteed to be delivered in order.
112+
slices.SortFunc(pending, func(a, b feedUpdate) int {
113+
return a.block.Number.Cmp(b.block.Number)
114+
})
115+
116+
case <-ticker.C:
117+
}
118+
119+
if len(pending) == 0 {
120+
continue
121+
}
122+
123+
height, empty, err := store.GetArchiveBlockHeight()
124+
if err != nil {
125+
log.Error("failed to get archive block height", "err", err)
126+
continue
127+
}
128+
if empty {
129+
continue
130+
}
131+
for _, update := range pending {
132+
if update.block.Number.Uint64() > height {
133+
break
134+
}
135+
f.newBlock.Send(evmcore.ChainHeadNotify{Block: update.block})
136+
f.newLogs.Send(update.logs)
137+
pending = pending[1:]
138+
}
139+
}
140+
}()
141+
}
142+
143+
func (f *ServiceFeed) notifyAboutNewBlock(
144+
block *evmcore.EvmBlock,
145+
logs []*types.Log,
146+
) {
147+
f.incomingUpdates <- feedUpdate{
148+
block: block,
149+
logs: logs,
150+
}
151+
}
152+
153+
func (f *ServiceFeed) Stop() {
154+
if f.stopFeeder == nil {
155+
return
156+
}
157+
close(f.stopFeeder)
158+
f.stopFeeder = nil
159+
<-f.feederDone
160+
f.scope.Close()
161+
}
162+
81163
type BlockProc struct {
82164
SealerModule blockproc.SealerModule
83165
TxListenerModule blockproc.TxListenerModule
@@ -131,7 +213,7 @@ type Service struct {
131213
blockBusyFlag uint32
132214
eventBusyFlag uint32
133215

134-
feed ServiceFeed
216+
feed ServiceFeed
135217

136218
gpo *gasprice.Oracle
137219

@@ -250,11 +332,11 @@ func newService(config Config, store *Store, blockProc BlockProc, engine lachesi
250332
defer done()
251333
return svc.processEvent(event)
252334
},
253-
SwitchEpochTo: svc.SwitchEpochTo,
254-
BVs: svc.ProcessBlockVotes,
255-
BR: svc.ProcessFullBlockRecord,
256-
EV: svc.ProcessEpochVote,
257-
ER: svc.ProcessFullEpochRecord,
335+
SwitchEpochTo: svc.SwitchEpochTo,
336+
BVs: svc.ProcessBlockVotes,
337+
BR: svc.ProcessFullBlockRecord,
338+
EV: svc.ProcessEpochVote,
339+
ER: svc.ProcessFullEpochRecord,
258340
},
259341
})
260342
if err != nil {
@@ -435,6 +517,8 @@ func (s *Service) Start() error {
435517
if s.store.evm.CheckLiveStateHash(blockState.LastBlock.Idx, blockState.FinalizedStateRoot) != nil {
436518
return errors.New("fullsync isn't possible because state root is missing")
437519
}
520+
// start notification feeder
521+
s.feed.Start(s.store.evm)
438522

439523
// start blocks processor
440524
s.blockProcTasks.Start(1)
@@ -475,7 +559,7 @@ func (s *Service) Stop() error {
475559
s.operaDialCandidates.Close()
476560

477561
s.handler.Stop()
478-
s.feed.scope.Close()
562+
s.feed.Stop()
479563
s.gpo.Stop()
480564
// it's safe to stop tflusher only before locking engineMu
481565
s.tflusher.Stop()

0 commit comments

Comments
 (0)