initial release

This commit is contained in:
Siavash Sameni
2025-08-29 08:17:52 +04:00
commit f55b4468b3
8 changed files with 690 additions and 0 deletions

264
internal/fetcher/rpc.go Normal file
View File

@@ -0,0 +1,264 @@
package fetcher
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"
)
type RPCClient struct {
// Map of network key -> RPC URL, e.g. {"eth": "https://mainnet.infura.io/v3/...", "arb": "https://arb1...", "base": "https://base-mainnet..."}
RPC map[string]string
http *http.Client
rateLimiter chan struct{} // Rate limiter: 5 TPS
}
func NewRPCClient(rpc map[string]string) *RPCClient {
client := &RPCClient{
RPC: rpc,
http: &http.Client{},
rateLimiter: make(chan struct{}, 5), // 5 TPS rate limiter
}
// Fill the rate limiter initially
for i := 0; i < 5; i++ {
client.rateLimiter <- struct{}{}
}
// Refill rate limiter at 5 TPS (200ms intervals)
go func() {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
select {
case client.rateLimiter <- struct{}{}:
default:
// Channel is full, skip
}
}
}()
return client
}
// FetchOwnerTokens iterates through tokenIds 0,1,2,... up to maxTokenId
// and calls ownerOf for each to find which ones belong to owner.
// network: key like "eth", "arb", "base".
// contract: 0x...
// owner: 0x...
// maxTokenId: maximum tokenId to check
// FetchAllTokenOwners scans all tokens 0..maxTokenId and returns ownership map
func (c *RPCClient) FetchAllTokenOwners(network, contract string, maxTokenId int, debug bool) (map[string]string, error) {
rpcURL := c.RPC[strings.ToLower(network)]
if rpcURL == "" { return nil, fmt.Errorf("missing RPC for network %s", network) }
contract = strings.ToLower(contract)
owners := make(map[string]string)
consecutiveErrors := 0
maxConsecutiveErrors := 50 // Stop if 50 consecutive tokens don't exist
// Check each tokenId from 0 to maxTokenId
for i := 0; i < maxTokenId; i++ {
tokenOwner, err := c.getOwnerOf(rpcURL, contract, i, debug)
if err != nil {
// If we hit the first invalid token ID, stop immediately as requested
if strings.Contains(strings.ToLower(err.Error()), "invalid token id") {
if debug {
fmt.Printf("DEBUG: Stopping at first invalid token ID %d due to error: %v\n", i, err)
}
break
}
consecutiveErrors++
if debug {
fmt.Printf("DEBUG: Token %d error: %v (consecutive errors: %d)\n", i, err, consecutiveErrors)
}
// Early termination if too many consecutive errors
if consecutiveErrors >= maxConsecutiveErrors {
if debug {
fmt.Printf("DEBUG: Stopping scan at token %d due to %d consecutive errors\n", i, consecutiveErrors)
}
break
}
continue
}
consecutiveErrors = 0 // Reset on successful call
// Store tokenId -> canonicalized owner
canonicalOwner := canonicalizeAddr(tokenOwner)
owners[strconv.Itoa(i)] = canonicalOwner
// Log specific tokens for debugging
if debug && (i == 103 || i == 0 || i == 1 || i%100 == 0 || len(owners) <= 50) {
fmt.Printf("DEBUG: Token %d owner: %s (canonical: %s)\n", i, tokenOwner, canonicalOwner)
}
// Always log token 103 regardless of debug mode
if i == 103 {
fmt.Printf("ALWAYS: Token 103 owner: %s (canonical: %s)\n", tokenOwner, canonicalOwner)
}
}
fmt.Printf("DEBUG: Scanned up to token %d, found %d with owners\n", maxTokenId, len(owners))
return owners, nil
}
func (c *RPCClient) FetchOwnerTokens(network, contract, owner string, maxTokenId int) ([]string, error) {
rpcURL := c.RPC[strings.ToLower(network)]
if rpcURL == "" { return nil, fmt.Errorf("missing RPC for network %s", network) }
contract = strings.ToLower(contract)
owner = strings.ToLower(owner)
var owned []string
// Check each tokenId from 0 to maxTokenId
for i := 0; i < maxTokenId; i++ {
tokenOwner, err := c.getOwnerOf(rpcURL, contract, i, false) // No debug for this method
if err != nil {
// Token might not exist, skip
continue
}
if strings.EqualFold(tokenOwner, owner) {
owned = append(owned, strconv.Itoa(i))
}
}
return owned, nil
}
// canonicalizeAddr normalizes address to 0x + 40 lowercase hex
func canonicalizeAddr(addr string) string {
x := strings.ToLower(strings.TrimSpace(addr))
if strings.HasPrefix(x, "0x") { x = x[2:] }
if len(x) > 40 { x = x[len(x)-40:] }
if len(x) < 40 { x = strings.Repeat("0", 40-len(x)) + x }
return "0x" + x
}
// getOwnerOf calls ownerOf(tokenId) on the contract with rate limiting and retry logic
func (c *RPCClient) getOwnerOf(rpcURL, contract string, tokenId int, debug bool) (string, error) {
maxRetries := 10
baseDelay := 100 * time.Millisecond
for attempt := 0; attempt <= maxRetries; attempt++ {
// Wait for rate limiter
<-c.rateLimiter
if debug && attempt > 0 {
fmt.Printf("DEBUG: Retry attempt %d for token %d\n", attempt, tokenId)
}
result, err := c.makeRPCCall(rpcURL, contract, tokenId, debug)
if err != nil {
// Check if it's a 429 error
if strings.Contains(err.Error(), "429") || strings.Contains(err.Error(), "Too Many Requests") {
if attempt < maxRetries {
// Exponential backoff: 100ms, 200ms, 400ms, 800ms, etc.
delay := time.Duration(1<<attempt) * baseDelay
if debug {
fmt.Printf("DEBUG: Rate limited (429) for token %d, retrying in %v (attempt %d/%d)\n", tokenId, delay, attempt+1, maxRetries)
}
time.Sleep(delay)
continue
} else {
if debug {
fmt.Printf("DEBUG: Max retries exhausted for token %d due to rate limiting\n", tokenId)
}
return "", fmt.Errorf("max retries exhausted due to rate limiting: %w", err)
}
}
// Non-429 error, return immediately
return "", err
}
// Success
return result, nil
}
return "", fmt.Errorf("max retries exhausted")
}
// makeRPCCall performs the actual RPC call without retry logic
func (c *RPCClient) makeRPCCall(rpcURL, contract string, tokenId int, debug bool) (string, error) {
// ownerOf(uint256) selector is 0x6352211e
// Encode tokenId as 32-byte hex
tokenIdHex := fmt.Sprintf("%064x", tokenId)
data := "0x6352211e" + tokenIdHex
payload := map[string]any{
"jsonrpc": "2.0",
"id": 1,
"method": "eth_call",
"params": []any{
map[string]string{
"to": contract,
"data": data,
},
"latest",
},
}
if debug {
fmt.Printf("DEBUG: RPC call tokenId=%d contract=%s method=ownerOf rpc=%s data=%s\n", tokenId, contract, rpcURL, data)
}
b, _ := json.Marshal(payload)
resp, err := c.http.Post(rpcURL, "application/json", bytes.NewReader(b))
if err != nil {
if debug {
fmt.Printf("DEBUG: HTTP error tokenId=%d err=%v\n", tokenId, err)
}
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
if debug {
fmt.Printf("DEBUG: HTTP status tokenId=%d status=%s\n", tokenId, resp.Status)
}
return "", fmt.Errorf("rpc status %s", resp.Status)
}
var result struct {
Result string `json:"result"`
Error *struct {
Message string `json:"message"`
} `json:"error"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
if debug {
fmt.Printf("DEBUG: JSON decode error tokenId=%d err=%v\n", tokenId, err)
}
return "", err
}
if result.Error != nil {
if debug {
fmt.Printf("DEBUG: RPC error tokenId=%d error=%s\n", tokenId, result.Error.Message)
}
return "", fmt.Errorf("rpc error: %s", result.Error.Message)
}
if debug {
fmt.Printf("DEBUG: RPC response tokenId=%d result=%s\n", tokenId, result.Result)
}
// Result is 32-byte address, extract last 20 bytes
if len(result.Result) >= 42 {
owner := "0x" + result.Result[len(result.Result)-40:]
if debug {
fmt.Printf("DEBUG: Extracted owner tokenId=%d owner=%s\n", tokenId, owner)
}
return owner, nil
}
if debug {
fmt.Printf("DEBUG: Invalid response length tokenId=%d result=%s\n", tokenId, result.Result)
}
return "", fmt.Errorf("invalid ownerOf response")
}