Skip to content

Commit 5fcbe58

Browse files
committed
x
1 parent adaea04 commit 5fcbe58

File tree

24 files changed

+446
-115
lines changed

24 files changed

+446
-115
lines changed

alex_questions.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# direct TX
2+
3+
- What format can we expect for the direct TX on the DA? some wrapper type would be useful to unpack and distinct from
4+
random bytes
5+
- Should we always include direct TX although they may be duplicates to mempool TX? spike: yes
6+
- Should we fill the block space with directTX if possible or reserve space for mempool TX
7+
8+
## Smarter sequencer
9+
build blocks by max bytes from the request rather than returning what was added as a batch before from the syncer.

apps/evm/single/cmd/run.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ var RunCmd = &cobra.Command{
7272
logger,
7373
datastore,
7474
100, // todo (Alex): what is a good value?
75+
nodeConfig.ForcedInclusion,
7576
)
7677

7778
nodeKey, err := key.LoadNodeKey(filepath.Dir(nodeConfig.ConfigPath()))

apps/evm/single/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ require (
5454
github.com/buger/goterm v1.0.4 // indirect
5555
github.com/celestiaorg/go-header v0.6.6 // indirect
5656
github.com/celestiaorg/go-libp2p-messenger v0.2.2 // indirect
57-
github.com/celestiaorg/go-square/v2 v2.2.0 // indirect
57+
github.com/celestiaorg/go-square/v2 v2.3.1 // indirect
5858
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
5959
github.com/cespare/xxhash/v2 v2.3.0 // indirect
6060
github.com/compose-spec/compose-go/v2 v2.6.0 // indirect

apps/evm/single/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ github.com/celestiaorg/go-header v0.6.6 h1:17GvSXU/w8L1YWHZP4pYm9/4YHA8iy5Ku2wTE
107107
github.com/celestiaorg/go-header v0.6.6/go.mod h1:RdnlTmsyuNerztNiJiQE5G/EGEH+cErhQ83xNjuGcaQ=
108108
github.com/celestiaorg/go-libp2p-messenger v0.2.2 h1:osoUfqjss7vWTIZrrDSy953RjQz+ps/vBFE7bychLEc=
109109
github.com/celestiaorg/go-libp2p-messenger v0.2.2/go.mod h1:oTCRV5TfdO7V/k6nkx7QjQzGrWuJbupv+0o1cgnY2i4=
110-
github.com/celestiaorg/go-square/v2 v2.2.0 h1:zJnUxCYc65S8FgUfVpyG/osDcsnjzo/JSXw/Uwn8zp4=
111-
github.com/celestiaorg/go-square/v2 v2.2.0/go.mod h1:j8kQUqJLYtcvCQMQV6QjEhUdaF7rBTXF74g8LbkR0Co=
110+
github.com/celestiaorg/go-square/v2 v2.3.1 h1:CDdiQ+QkKPOQEcyDPODgP/PbAEzqUcftsohCPcbvsnw=
111+
github.com/celestiaorg/go-square/v2 v2.3.1/go.mod h1:6M2txj0j6dkoE+cgwyG0EqrEPhbZpM2R1lsWEopMIBc=
112112
github.com/celestiaorg/utils v0.1.0 h1:WsP3O8jF7jKRgLNFmlDCwdThwOFMFxg0MnqhkLFVxPo=
113113
github.com/celestiaorg/utils v0.1.0/go.mod h1:vQTh7MHnvpIeCQZ2/Ph+w7K1R2UerDheZbgJEJD2hSU=
114114
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=

block/direct_tx.go

Lines changed: 1 addition & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,8 @@ import (
55
"crypto/sha256"
66
"errors"
77
"github.com/evstack/ev-node/types"
8-
"sync"
98
)
109

11-
type ForcedInclusionConfig struct {
12-
MaxInclusionDelay uint64 // Max inclusion time in DA block time units
13-
MinDADelay uint64 // Minimum number of DA blocks before including a direct tx
14-
}
15-
1610
type DirectTransaction struct {
1711
TxHash types.Hash
1812
FirstSeenDAHeight uint64 // DA block time when the tx was seen
@@ -21,15 +15,6 @@ type DirectTransaction struct {
2115
TX []byte
2216
}
2317

24-
type DirectTxTracker struct {
25-
config ForcedInclusionConfig
26-
mu sync.RWMutex
27-
//txs map[string]DirectTransaction // hash -> tx
28-
txs []DirectTransaction // ordered by da height and position in blob
29-
latestSeenDABlockTime uint64
30-
latestDAHeight uint64
31-
}
32-
3318
func (m *Manager) handlePotentialDirectTXs(ctx context.Context, bz []byte, daHeight uint64) bool {
3419
var unsignedData types.Data // todo (Alex): we need some type to separate from noise
3520
err := unsignedData.UnmarshalBinary(bz)
@@ -65,40 +50,9 @@ func (m *Manager) handlePotentialDirectTXs(ctx context.Context, bz []byte, daHei
6550
Included: false,
6651
IncludedAt: 0,
6752
}
68-
m.directTXTracker.mu.Lock()
69-
m.directTXTracker.txs = append(m.directTXTracker.txs, d)
70-
m.directTXTracker.mu.Unlock()
53+
_ = d
7154
}
7255
return true
7356
}
7457

7558
var ErrMissingDirectTx = errors.New("missing direct tx")
76-
77-
func (m *Manager) getPendingDirectTXs(_ context.Context, maxBytes int) ([][]byte, error) {
78-
remaining := maxBytes
79-
currBlockTime := m.directTXTracker.latestSeenDABlockTime
80-
var res [][]byte
81-
82-
m.directTXTracker.mu.Lock()
83-
defer m.directTXTracker.mu.Unlock()
84-
for _, tx := range m.directTXTracker.txs {
85-
if tx.Included {
86-
continue
87-
}
88-
89-
if currBlockTime-tx.FirstSeenDAHeight > m.directTXTracker.config.MaxInclusionDelay {
90-
// should have been forced included already.
91-
// what should we do now? stop the world
92-
return nil, ErrMissingDirectTx
93-
}
94-
if m.directTXTracker.latestDAHeight-tx.FirstSeenDAHeight < m.directTXTracker.config.MinDADelay {
95-
// we can stop here as following tx are newer
96-
break
97-
}
98-
if len(tx.TX) > remaining {
99-
break
100-
}
101-
res = append(res, tx.TX)
102-
}
103-
return res, nil
104-
}

block/direct_tx_reaper.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,16 @@ func NewDirectTxReaper(
4444
interval time.Duration,
4545
logger log.EventLogger,
4646
store ds.Batching,
47+
daStartHeight uint64,
4748
) *DirectTxReaper {
49+
if daStartHeight == 0 {
50+
daStartHeight = 1
51+
}
4852
if interval <= 0 {
4953
interval = 100 * time.Millisecond
5054
}
5155
daHeight := new(atomic.Uint64)
52-
daHeight.Store(1)
56+
daHeight.Store(daStartHeight)
5357
return &DirectTxReaper{
5458
da: da,
5559
sequencer: sequencer,
@@ -80,7 +84,7 @@ func (r *DirectTxReaper) Start(ctx context.Context) {
8084
return
8185
case <-ticker.C:
8286
daHeight := r.daHeight.Load()
83-
if err := r.SubmitTxs(daHeight); err != nil {
87+
if err := r.retrieveDirectTXs(daHeight); err != nil {
8488
if strings.Contains(err.Error(), coreda.ErrHeightFromFuture.Error()) {
8589
r.logger.Debug("IDs not found at height", "height", daHeight)
8690
} else {
@@ -94,8 +98,8 @@ func (r *DirectTxReaper) Start(ctx context.Context) {
9498
}
9599
}
96100

97-
// SubmitTxs retrieves direct transactions from the DA layer and submits them to the sequencer.
98-
func (r *DirectTxReaper) SubmitTxs(daHeight uint64) error {
101+
// retrieveDirectTXs retrieves direct transactions from the DA layer and submits them to the sequencer.
102+
func (r *DirectTxReaper) retrieveDirectTXs(daHeight uint64) error {
99103
// Get the latest DA height
100104
// Get all blob IDs at the current DA height
101105
result, err := r.da.GetIDs(r.ctx, daHeight, nil)
@@ -115,7 +119,7 @@ func (r *DirectTxReaper) SubmitTxs(daHeight uint64) error {
115119
}
116120
r.logger.Debug("Blobs found at height", "height", daHeight, "count", len(blobs))
117121

118-
var newTxs [][]byte
122+
var newTxs []sequencer.DirectTX
119123
for _, blob := range blobs {
120124
r.logger.Debug("Processing blob data")
121125

@@ -134,18 +138,21 @@ func (r *DirectTxReaper) SubmitTxs(daHeight uint64) error {
134138
}
135139

136140
// Process each transaction in the blob
137-
for _, tx := range data.Txs {
141+
for i, tx := range data.Txs {
138142
txHash := hashTx(tx)
139143
has, err := r.seenStore.Has(r.ctx, ds.NewKey(txHash))
140144
if err != nil {
141145
return fmt.Errorf("check seenStore: %w", err)
142146
}
143147
if !has {
144-
newTxs = append(newTxs, tx)
148+
newTxs = append(newTxs, sequencer.DirectTX{
149+
TX: tx,
150+
ID: result.IDs[i],
151+
FirstSeenHeight: daHeight,
152+
FirstSeenTime: result.Timestamp.Unix(),
153+
})
145154
}
146155
}
147-
// todo: apply checks"
148-
// DA header time: result.Timestamp
149156
}
150157

151158
if len(newTxs) == 0 {
@@ -154,13 +161,13 @@ func (r *DirectTxReaper) SubmitTxs(daHeight uint64) error {
154161
}
155162

156163
r.logger.Debug("Submitting direct txs to sequencer", "txCount", len(newTxs))
157-
err = r.sequencer.SubmitDirectTxs(r.ctx, newTxs)
164+
err = r.sequencer.SubmitDirectTxs(r.ctx, newTxs...)
158165
if err != nil {
159166
return fmt.Errorf("submit direct txs to sequencer: %w", err)
160167
}
161168
// Mark the transactions as seen
162-
for _, tx := range newTxs {
163-
txHash := hashTx(tx)
169+
for _, v := range newTxs {
170+
txHash := hashTx(v.TX)
164171
if err := r.seenStore.Put(r.ctx, ds.NewKey(txHash), []byte{1}); err != nil {
165172
return fmt.Errorf("persist seen tx: %w", err)
166173
}

block/direct_tx_reaper_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func TestSubmitTxs(t *testing.T) {
153153
reaper := createTestDirectTxReaper(t, mockDA, mockSequencer, chainID)
154154
tt.setupMocks(mockDA, mockSequencer, reaper)
155155

156-
err := reaper.SubmitTxs(1)
156+
err := reaper.retrieveDirectTXs(1)
157157

158158
if tt.expectedError {
159159
assert.Error(t, err)
@@ -208,7 +208,7 @@ func createTestDirectTxReaper(
208208
ctx,
209209
mockDA,
210210
mockSequencer,
211-
nil, // Manager is not used in SubmitTxs
211+
nil, // Manager is not used in retrieveDirectTXs
212212
chainID,
213213
time.Second,
214214
logger,

block/manager.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"encoding/hex"
99
"errors"
1010
"fmt"
11+
"github.com/celestiaorg/go-square/v2/share"
1112
"path/filepath"
1213
"sync"
1314
"sync/atomic"
@@ -168,8 +169,6 @@ type Manager struct {
168169
// validatorHasherProvider is used to provide the validator hash for the header.
169170
// It is used to set the validator hash in the header.
170171
validatorHasherProvider types.ValidatorHasherProvider
171-
172-
directTXTracker DirectTxTracker
173172
}
174173

175174
// getInitialState tries to load lastState from Store, and if it's not available it reads genesis.
@@ -402,7 +401,6 @@ func NewManager(
402401
txNotifyCh: make(chan struct{}, 1), // Non-blocking channel
403402
signaturePayloadProvider: managerOpts.SignaturePayloadProvider,
404403
validatorHasherProvider: managerOpts.ValidatorHasherProvider,
405-
directTXTracker: DirectTxTracker{txs: make([]DirectTransaction, 0)},
406404
}
407405

408406
// initialize da included height
@@ -546,14 +544,21 @@ func (m *Manager) GetExecutor() coreexecutor.Executor {
546544
return m.exec
547545
}
548546

547+
const ( // copied from da/jsonclient/internal
548+
defaultGovMaxSquareSize = 64
549+
defaultMaxBytes = defaultGovMaxSquareSize * defaultGovMaxSquareSize * share.ContinuationSparseShareContentSize
550+
)
551+
549552
func (m *Manager) retrieveBatch(ctx context.Context) (*BatchData, error) {
550553
m.logger.Debug("Attempting to retrieve next batch",
551554
"chainID", m.genesis.ChainID,
552555
"lastBatchData", m.lastBatchData)
553556

554557
req := coresequencer.GetNextBatchRequest{
555-
Id: []byte(m.genesis.ChainID),
556-
LastBatchData: m.lastBatchData,
558+
DAIncludedHeight: m.daIncludedHeight.Load(),
559+
Id: []byte(m.genesis.ChainID),
560+
LastBatchData: m.lastBatchData,
561+
MaxBytes: defaultMaxBytes, // todo (Alex): do we need to reserve some space for headers and other data?
557562
}
558563

559564
res, err := m.sequencer.GetNextBatch(ctx, req)

block/reaper_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestReaper_SubmitTxs_Success(t *testing.T) {
3333
// Prepare transaction and its hash
3434
tx := []byte("tx1")
3535

36-
// Mock interactions for the first SubmitTxs call
36+
// Mock interactions for the first retrieveDirectTXs call
3737
mockExec.On("GetTxs", mock.Anything).Return([][]byte{tx}, nil).Once()
3838
submitReqMatcher := mock.MatchedBy(func(req coresequencer.SubmitBatchTxsRequest) bool {
3939
return string(req.Id) == chainID && len(req.Batch.Transactions) == 1 && string(req.Batch.Transactions[0]) == string(tx)

core/sequencer/direct_tx.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ package sequencer
22

33
import (
44
"context"
5+
"crypto/sha256"
6+
"encoding/binary"
7+
"fmt"
8+
"hash"
59
)
610

711
// DirectTxSequencer is an interface for sequencers that can handle direct transactions.
@@ -11,5 +15,46 @@ type DirectTxSequencer interface {
1115

1216
// SubmitDirectTxs adds direct transactions to the sequencer.
1317
// This method is called by the DirectTxReaper.
14-
SubmitDirectTxs(ctx context.Context, txs [][]byte) error
18+
SubmitDirectTxs(ctx context.Context, txs ...DirectTX) error
19+
}
20+
21+
type DirectTX struct {
22+
TX []byte
23+
ID []byte
24+
FirstSeenHeight uint64
25+
// unix time
26+
FirstSeenTime int64
27+
}
28+
29+
// ValidateBasic performs basic validation of DirectTX fields
30+
func (d *DirectTX) ValidateBasic() error {
31+
if len(d.TX) == 0 {
32+
return fmt.Errorf("tx cannot be empty")
33+
}
34+
if len(d.ID) == 0 {
35+
return fmt.Errorf("id cannot be empty")
36+
}
37+
if d.FirstSeenHeight == 0 {
38+
return fmt.Errorf("first seen height cannot be zero")
39+
}
40+
if d.FirstSeenTime == 0 {
41+
return fmt.Errorf("first seen time cannot be zero")
42+
}
43+
return nil
44+
}
45+
46+
// Hash Hash on the data
47+
func (d *DirectTX) Hash() ([]byte, error) {
48+
hasher := sha256.New()
49+
hashWriteInt(hasher, len(d.ID))
50+
hasher.Write(d.ID)
51+
hashWriteInt(hasher, len(d.TX))
52+
hasher.Write(d.TX)
53+
return hasher.Sum(nil), nil
54+
}
55+
56+
func hashWriteInt(hasher hash.Hash, data int) {
57+
txLen := make([]byte, 8) // 8 bytes for uint64
58+
binary.BigEndian.PutUint64(txLen, uint64(data))
59+
hasher.Write(txLen)
1560
}

0 commit comments

Comments
 (0)