Skip to content

core/filtermaps: define APIs for map, epoch calculation #31659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 57 additions & 45 deletions core/filtermaps/filtermaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,14 @@ type filterMapsRange struct {
initialized bool
headIndexed bool
headDelimiter uint64 // zero if headIndexed is false

// if initialized then all maps are rendered in the maps range
maps common.Range[uint32]

// if tailPartialEpoch > 0 then maps between firstRenderedMap-mapsPerEpoch and
// firstRenderedMap-mapsPerEpoch+tailPartialEpoch-1 are rendered
tailPartialEpoch uint32

// if initialized then all log values in the blocks range are fully
// rendered
// blockLvPointers are available in the blocks range
Expand Down Expand Up @@ -218,13 +221,15 @@ type Config struct {
}

// NewFilterMaps creates a new FilterMaps and starts the indexer.
func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, finalBlock uint64, params Params, config Config) *FilterMaps {
func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, finalBlock uint64, params Params, config Config) (*FilterMaps, error) {
rs, initialized, err := rawdb.ReadFilterMapsRange(db)
if err != nil || rs.Version != databaseVersion {
rs, initialized = rawdb.FilterMapsRange{}, false
log.Warn("Invalid log index database version; resetting log index")
}
params.deriveFields()
if err := params.sanitize(); err != nil {
return nil, err
}
f := &FilterMaps{
db: db,
closeCh: make(chan struct{}),
Expand All @@ -246,7 +251,7 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
tailPartialEpoch: rs.TailPartialEpoch,
},
// deleting last unindexed epoch might have been interrupted by shutdown
cleanedEpochsBefore: max(rs.MapsFirst>>params.logMapsPerEpoch, 1) - 1,
cleanedEpochsBefore: max(params.mapEpoch(rs.MapsFirst), 1) - 1,
historyCutoff: historyCutoff,
finalBlock: finalBlock,
matcherSyncCh: make(chan *FilterMapsMatcherBackend),
Expand All @@ -273,7 +278,7 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
"first map", f.indexedRange.maps.First(), "last map", f.indexedRange.maps.Last(),
"head indexed", f.indexedRange.headIndexed)
}
return f
return f, nil
}

// Start starts the indexer.
Expand Down Expand Up @@ -390,7 +395,7 @@ func (f *FilterMaps) init() error {
batch := f.db.NewBatch()
for epoch := range bestLen {
cp := checkpoints[bestIdx][epoch]
f.storeLastBlockOfMap(batch, (uint32(epoch+1)<<f.logMapsPerEpoch)-1, cp.BlockNumber, cp.BlockId)
f.storeLastBlockOfMap(batch, f.Params.lastEpochMap(uint32(epoch)), cp.BlockNumber, cp.BlockId)
f.storeBlockLvPointer(batch, cp.BlockNumber, cp.FirstIndex)
}
fmr := filterMapsRange{
Expand All @@ -399,7 +404,7 @@ func (f *FilterMaps) init() error {
if bestLen > 0 {
cp := checkpoints[bestIdx][bestLen-1]
fmr.blocks = common.NewRange(cp.BlockNumber+1, 0)
fmr.maps = common.NewRange(uint32(bestLen)<<f.logMapsPerEpoch, 0)
fmr.maps = common.NewRange(f.Params.firstEpochMap(uint32(bestLen)), 0)
}
f.setRange(batch, f.targetView, fmr, false)
return batch.Write()
Expand Down Expand Up @@ -562,24 +567,26 @@ func (f *FilterMaps) getFilterMap(mapIndex uint32) (filterMap, error) {

// getFilterMapRow fetches the given filter map row. If baseLayerOnly is true
// then only the first baseRowLength entries are returned.
func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32, baseLayerOnly bool) (FilterRow, error) {
baseMapRowIndex := f.mapRowIndex(mapIndex&-f.baseRowGroupLength, rowIndex)
baseRows, ok := f.baseRowsCache.Get(baseMapRowIndex)
func (f *FilterMaps) getFilterMapRow(mapIndex, row uint32, baseLayerOnly bool) (FilterRow, error) {
group := f.Params.mapGroupIndex(mapIndex)
rowIndex := f.mapRowIndex(group, row)
baseRows, ok := f.baseRowsCache.Get(rowIndex)
if !ok {
var err error
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, rowIndex, f.baseRowGroupSize, f.logMapWidth)
if err != nil {
return nil, fmt.Errorf("failed to retrieve filter map %d base rows %d: %v", mapIndex, rowIndex, err)
return nil, fmt.Errorf("failed to retrieve filter map %d base rows %d: %v", mapIndex, row, err)
}
f.baseRowsCache.Add(baseMapRowIndex, baseRows)
f.baseRowsCache.Add(rowIndex, baseRows)
}
baseRow := baseRows[mapIndex&(f.baseRowGroupLength-1)]
offset := f.Params.mapGroupOffset(mapIndex)
baseRow := baseRows[offset]
if baseLayerOnly {
return baseRow, nil
}
extRow, err := rawdb.ReadFilterMapExtRow(f.db, f.mapRowIndex(mapIndex, rowIndex), f.logMapWidth)
extRow, err := rawdb.ReadFilterMapExtRow(f.db, f.mapRowIndex(mapIndex, row), f.logMapWidth)
if err != nil {
return nil, fmt.Errorf("failed to retrieve filter map %d extended row %d: %v", mapIndex, rowIndex, err)
return nil, fmt.Errorf("failed to retrieve filter map %d extended row %d: %v", mapIndex, row, err)
}
return FilterRow(append(baseRow, extRow...)), nil
}
Expand All @@ -588,53 +595,58 @@ func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32, baseLayerOnly bo
// indices and a shared row index.
func (f *FilterMaps) storeFilterMapRows(batch ethdb.Batch, mapIndices []uint32, rowIndex uint32, rows []FilterRow) error {
for len(mapIndices) > 0 {
baseMapIndex := mapIndices[0] & -f.baseRowGroupLength
groupLength := 1
for groupLength < len(mapIndices) && mapIndices[groupLength]&-f.baseRowGroupLength == baseMapIndex {
groupLength++
}
if err := f.storeFilterMapRowsOfGroup(batch, mapIndices[:groupLength], rowIndex, rows[:groupLength]); err != nil {
var (
pos = 1
groupIndex = f.Params.mapGroupIndex(mapIndices[0])
)
for pos < len(mapIndices) && f.Params.mapGroupIndex(mapIndices[pos]) == groupIndex {
pos++
}
if err := f.storeFilterMapRowsOfGroup(batch, mapIndices[:pos], rowIndex, rows[:pos]); err != nil {
return err
}
mapIndices, rows = mapIndices[groupLength:], rows[groupLength:]
mapIndices, rows = mapIndices[pos:], rows[pos:]
}
return nil
}

// storeFilterMapRowsOfGroup stores a set of filter map rows at map indices
// belonging to the same base row group.
func (f *FilterMaps) storeFilterMapRowsOfGroup(batch ethdb.Batch, mapIndices []uint32, rowIndex uint32, rows []FilterRow) error {
baseMapIndex := mapIndices[0] & -f.baseRowGroupLength
baseMapRowIndex := f.mapRowIndex(baseMapIndex, rowIndex)
var baseRows [][]uint32
if uint32(len(mapIndices)) != f.baseRowGroupLength { // skip base rows read if all rows are replaced
func (f *FilterMaps) storeFilterMapRowsOfGroup(batch ethdb.Batch, mapIndices []uint32, row uint32, rows []FilterRow) error {
var (
baseRows [][]uint32
groupIndex = f.Params.mapGroupIndex(mapIndices[0])
rowIndex = f.mapRowIndex(groupIndex, row)
)
if uint32(len(mapIndices)) != f.baseRowGroupSize { // skip base rows read if all rows are replaced
var ok bool
baseRows, ok = f.baseRowsCache.Get(baseMapRowIndex)
baseRows, ok = f.baseRowsCache.Get(rowIndex)
if !ok {
var err error
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, rowIndex, f.baseRowGroupSize, f.logMapWidth)
if err != nil {
return fmt.Errorf("failed to retrieve filter map %d base rows %d for modification: %v", mapIndices[0]&-f.baseRowGroupLength, rowIndex, err)
return fmt.Errorf("failed to retrieve filter map %d base rows %d for modification: %v", f.Params.mapGroupIndex(mapIndices[0]), row, err)
}
}
} else {
baseRows = make([][]uint32, f.baseRowGroupLength)
baseRows = make([][]uint32, f.baseRowGroupSize)
}
for i, mapIndex := range mapIndices {
if mapIndex&-f.baseRowGroupLength != baseMapIndex {
panic("mapIndices are not in the same base row group")
if f.Params.mapGroupIndex(mapIndex) != groupIndex {
return fmt.Errorf("maps are not in the same base row group, index: %d, group: %d", mapIndex, groupIndex)
}
baseRow := []uint32(rows[i])
var extRow FilterRow
if uint32(len(rows[i])) > f.baseRowLength {
extRow = baseRow[f.baseRowLength:]
baseRow = baseRow[:f.baseRowLength]
}
baseRows[mapIndex&(f.baseRowGroupLength-1)] = baseRow
rawdb.WriteFilterMapExtRow(batch, f.mapRowIndex(mapIndex, rowIndex), extRow, f.logMapWidth)
offset := f.Params.mapGroupOffset(mapIndex)
baseRows[offset] = baseRow
rawdb.WriteFilterMapExtRow(batch, f.mapRowIndex(mapIndex, row), extRow, f.logMapWidth)
}
f.baseRowsCache.Add(baseMapRowIndex, baseRows)
rawdb.WriteFilterMapBaseRows(batch, baseMapRowIndex, baseRows, f.logMapWidth)
f.baseRowsCache.Add(rowIndex, baseRows)
rawdb.WriteFilterMapBaseRows(batch, rowIndex, baseRows, f.logMapWidth)
return nil
}

Expand All @@ -644,9 +656,9 @@ func (f *FilterMaps) storeFilterMapRowsOfGroup(batch ethdb.Batch, mapIndices []u
// same data proximity reasons it is also suitable for database representation.
// See also:
// https://eips.ethereum.org/EIPS/eip-7745#hash-tree-structure
func (f *FilterMaps) mapRowIndex(mapIndex, rowIndex uint32) uint64 {
func (f *FilterMaps) mapRowIndex(mapIndex, row uint32) uint64 {
epochIndex, mapSubIndex := mapIndex>>f.logMapsPerEpoch, mapIndex&(f.mapsPerEpoch-1)
return (uint64(epochIndex)<<f.logMapHeight+uint64(rowIndex))<<f.logMapsPerEpoch + uint64(mapSubIndex)
return (uint64(epochIndex)<<f.logMapHeight+uint64(row))<<f.logMapsPerEpoch + uint64(mapSubIndex)
}

// getBlockLvPointer returns the starting log value index where the log values
Expand Down Expand Up @@ -722,12 +734,12 @@ func (f *FilterMaps) deleteTailEpoch(epoch uint32) (bool, error) {
defer f.indexLock.Unlock()

// determine epoch boundaries
firstMap := epoch << f.logMapsPerEpoch
lastBlock, _, err := f.getLastBlockOfMap(firstMap + f.mapsPerEpoch - 1)
lastBlock, _, err := f.getLastBlockOfMap(f.Params.lastEpochMap(epoch))
if err != nil {
return false, fmt.Errorf("failed to retrieve last block of deleted epoch %d: %v", epoch, err)
}
var firstBlock uint64
firstMap := f.Params.firstEpochMap(epoch)
if epoch > 0 {
firstBlock, _, err = f.getLastBlockOfMap(firstMap - 1)
if err != nil {
Expand All @@ -738,8 +750,8 @@ func (f *FilterMaps) deleteTailEpoch(epoch uint32) (bool, error) {
// update rendered range if necessary
var (
fmr = f.indexedRange
firstEpoch = f.indexedRange.maps.First() >> f.logMapsPerEpoch
afterLastEpoch = (f.indexedRange.maps.AfterLast() + f.mapsPerEpoch - 1) >> f.logMapsPerEpoch
firstEpoch = f.Params.mapEpoch(f.indexedRange.maps.First())
afterLastEpoch = f.Params.mapEpoch(f.indexedRange.maps.AfterLast() + f.mapsPerEpoch - 1)
)
if f.indexedRange.tailPartialEpoch != 0 && firstEpoch > 0 {
firstEpoch--
Expand All @@ -751,7 +763,7 @@ func (f *FilterMaps) deleteTailEpoch(epoch uint32) (bool, error) {
// first fully or partially rendered epoch and there is at least one
// rendered map in the next epoch; remove from indexed range
fmr.tailPartialEpoch = 0
fmr.maps.SetFirst((epoch + 1) << f.logMapsPerEpoch)
fmr.maps.SetFirst(f.Params.firstEpochMap(epoch + 1))
fmr.blocks.SetFirst(lastBlock + 1)
f.setRange(f.db, f.indexedView, fmr, false)
default:
Expand Down Expand Up @@ -826,7 +838,7 @@ func (f *FilterMaps) exportCheckpoints() {
w.WriteString("[\n")
comma := ","
for epoch := uint32(0); epoch < epochCount; epoch++ {
lastBlock, lastBlockId, err := f.getLastBlockOfMap((epoch+1)<<f.logMapsPerEpoch - 1)
lastBlock, lastBlockId, err := f.getLastBlockOfMap(f.Params.lastEpochMap(epoch))
if err != nil {
log.Error("Error fetching last block of epoch", "epoch", epoch, "error", err)
return
Expand Down
12 changes: 6 additions & 6 deletions core/filtermaps/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (f *FilterMaps) tryIndexHead() error {
// is changed.
func (f *FilterMaps) tryIndexTail() (bool, error) {
for {
firstEpoch := f.indexedRange.maps.First() >> f.logMapsPerEpoch
firstEpoch := f.Params.mapEpoch(f.indexedRange.maps.First())
if firstEpoch == 0 || !f.needTailEpoch(firstEpoch-1) {
break
}
Expand Down Expand Up @@ -352,7 +352,7 @@ func (f *FilterMaps) tryIndexTail() (bool, error) {
// Note that unindexing is very quick as it only removes continuous ranges of
// data from the database and is also called while running head indexing.
func (f *FilterMaps) tryUnindexTail() (bool, error) {
firstEpoch := f.indexedRange.maps.First() >> f.logMapsPerEpoch
firstEpoch := f.Params.mapEpoch(f.indexedRange.maps.First())
if f.indexedRange.tailPartialEpoch > 0 && firstEpoch > 0 {
firstEpoch--
}
Expand Down Expand Up @@ -385,11 +385,11 @@ func (f *FilterMaps) tryUnindexTail() (bool, error) {
// needTailEpoch returns true if the given tail epoch needs to be kept
// according to the current tail target, false if it can be removed.
func (f *FilterMaps) needTailEpoch(epoch uint32) bool {
firstEpoch := f.indexedRange.maps.First() >> f.logMapsPerEpoch
firstEpoch := f.Params.mapEpoch(f.indexedRange.maps.First())
if epoch > firstEpoch {
return true
}
if (epoch+1)<<f.logMapsPerEpoch >= f.indexedRange.maps.AfterLast() {
if f.Params.firstEpochMap(epoch+1) >= f.indexedRange.maps.AfterLast() {
return true
}
if epoch+1 < firstEpoch {
Expand All @@ -398,7 +398,7 @@ func (f *FilterMaps) needTailEpoch(epoch uint32) bool {
var lastBlockOfPrevEpoch uint64
if epoch > 0 {
var err error
lastBlockOfPrevEpoch, _, err = f.getLastBlockOfMap(epoch<<f.logMapsPerEpoch - 1)
lastBlockOfPrevEpoch, _, err = f.getLastBlockOfMap(f.Params.lastEpochMap(epoch - 1))
if err != nil {
log.Error("Could not get last block of previous epoch", "epoch", epoch-1, "error", err)
return epoch >= firstEpoch
Expand All @@ -407,7 +407,7 @@ func (f *FilterMaps) needTailEpoch(epoch uint32) bool {
if f.historyCutoff > lastBlockOfPrevEpoch {
return false
}
lastBlockOfEpoch, _, err := f.getLastBlockOfMap((epoch+1)<<f.logMapsPerEpoch - 1)
lastBlockOfEpoch, _, err := f.getLastBlockOfMap(f.Params.lastEpochMap(epoch))
if err != nil {
log.Error("Could not get last block of epoch", "epoch", epoch, "error", err)
return epoch >= firstEpoch
Expand Down
4 changes: 2 additions & 2 deletions core/filtermaps/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var testParams = Params{
logMapWidth: 24,
logMapsPerEpoch: 4,
logValuesPerMap: 4,
baseRowGroupLength: 4,
baseRowGroupSize: 4,
baseRowLengthRatio: 2,
logLayerDiff: 2,
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func (ts *testSetup) setHistory(history uint64, noHistory bool) {
History: history,
Disabled: noHistory,
}
ts.fm = NewFilterMaps(ts.db, view, 0, 0, ts.params, config)
ts.fm, _ = NewFilterMaps(ts.db, view, 0, 0, ts.params, config)
ts.fm.testDisableSnapshots = ts.testDisableSnapshots
ts.fm.Start()
}
Expand Down
2 changes: 1 addition & 1 deletion core/filtermaps/map_renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (r *mapRenderer) run(stopCb func() bool, writeCb func()) (bool, error) {
// map finished
r.finishedMaps[r.currentMap.mapIndex] = r.currentMap
r.finished.SetLast(r.finished.AfterLast())
if len(r.finishedMaps) >= maxMapsPerBatch || r.finished.AfterLast()&(r.f.baseRowGroupLength-1) == 0 {
if len(r.finishedMaps) >= maxMapsPerBatch || r.f.Params.mapGroupOffset(r.finished.AfterLast()) == 0 {
if err := r.writeFinishedMaps(stopCb); err != nil {
return false, err
}
Expand Down
4 changes: 2 additions & 2 deletions core/filtermaps/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func (m *matchSequenceInstance) dropNext(mapIndex uint32) bool {
// results at mapIndex and mapIndex+1. Note that acquiring nextNextRes may be
// skipped and it can be substituted with an empty list if baseRes has no potential
// matches that could be sequence matched with anything that could be in nextNextRes.
func (params *Params) matchResults(mapIndex uint32, offset uint64, baseRes, nextRes potentialMatches) potentialMatches {
func (p *Params) matchResults(mapIndex uint32, offset uint64, baseRes, nextRes potentialMatches) potentialMatches {
if nextRes == nil || (baseRes != nil && len(baseRes) == 0) {
// if nextRes is a wild card or baseRes is empty then the sequence matcher
// result equals baseRes.
Expand All @@ -848,7 +848,7 @@ func (params *Params) matchResults(mapIndex uint32, offset uint64, baseRes, next
// if baseRes is a wild card or nextRes is empty then the sequence matcher
// result is the items of nextRes with a negative offset applied.
result := make(potentialMatches, 0, len(nextRes))
min := (uint64(mapIndex) << params.logValuesPerMap) + offset
min := (uint64(mapIndex) << p.logValuesPerMap) + offset
for _, v := range nextRes {
if v >= min {
result = append(result, v-offset)
Expand Down
Loading