DEV Community

Cover image for Building a Ethereum wallet watcher using concurrent programming in GoLang
Ronilson Alves
Ronilson Alves

Posted on • Updated on • Originally published at ronilsonalves.com

Building a Ethereum wallet watcher using concurrent programming in GoLang

Cover photo by Mika Baumeister / Unsplash

In my last article I talked about how simple it is to build applications that make use of concurrent programming using the Go language, we saw how simple it is to implement and how efficient it can be, thanks to its construction that provides developers with powerful tools to take full advantage of the potential of concurrent programming. In this article we will take a practical step-by-step look at how to build an Ethereum wallet watcher taking advantage of this strength of GoLang.

Context

Web3 since I returned to my programming studies has always been a topic I am very interested in, I am an enthusiast of the blockchain technology behind most of the products developed and which has been adopted in more traditional solutions so to speak recently. Some wallets have their private keys publicly exposed, most of them being made available by development kits where you can run a test node in a local environment. Some users, by carelessness and lack of attention, end up sending values to the address of this wallet on the main network (in production so to speak).

Goal

We will build a service in GoLang to look at Ethereum wallets and if there are any incoming balance transactions, we will try to perform an outgoing transaction to get those values.

To further exemplify the use of concurrent programming we will also provide an API for querying information from an Ethereum address.

In summary, we will run our wallet watcher service side by side with a rest API. Let's go?

Before we begin

The source code of this project is on my GitHub, you can clone it using git clone and running it on your machine, don't forget to create an .env file in the root of the project following the .env.example

git clone https://github.com/ronilsonalves/go-wallet-watcher.git
Enter fullscreen mode Exit fullscreen mode

If you also want to build the Rest API for queries, you will need to have an API key from Etherscan.io, for this you will need to have an account and request a free key at https://etherscan.io/myapikey

Getting started

Where we will save our project and start it, for this, in the terminal we need to type:

go mod init 'project-name'
Enter fullscreen mode Exit fullscreen mode

Before proceeding, let's install the go-ethereum and godotenv modules that will help us build our service:

go get github.com/ethereum/go-ethereum
go get github.com/joho/godotenv
Enter fullscreen mode Exit fullscreen mode

Now, we must create a directory called internal where we will organize the internal files of our project, inside it we will must create another directory folder which will be our domain package and finally, we will create our file wallet.go which will contain a struct that will represent an Ethereum wallet:

package domain

type Wallet struct {
    Address      string        `json:"address"`
    SecretKey    string        `json:"secret-key,omitempty"`
    Balance      float64       `json:"balance"`
    Transactions []Transaction `json:"transactions,omitempty"`
}
Enter fullscreen mode Exit fullscreen mode

Also, we must create an struct to represent a transaction, we will use it in further:

package domain

type Transaction struct {
    BlockNumber       string `json:"blockNumber,omitempty"`
    TimeStamp         string `json:"timeStamp,omitempty"`
    Hash              string `json:"hash,omitempty"`
    Nonce             string `json:"nonce,omitempty"`
    BlockHash         string `json:"blockHash,omitempty"`
    TransactionIndex  string `json:"transactionIndex,omitempty"`
    From              string `json:"from,omitempty"`
    To                string `json:"to,omitempty"`
    Value             string `json:"value,omitempty"`
    Gas               string `json:"gas,omitempty"`
    GasPrice          string `json:"gasPrice,omitempty"`
    IsError           string `json:"isError,omitempty"`
    TxreceiptStatus   string `json:"txreceipt_status,omitempty"`
    Input             string `json:"input,omitempty"`
    ContractAddress   string `json:"contractAddress,omitempty"`
    CumulativeGasUsed string `json:"cumulativeGasUsed,omitempty"`
    GasUsed           string `json:"gasUsed,omitempty"`
    Confirmations     string `json:"confirmations,omitempty"`
    MethodId          string `json:"methodId,omitempty"`
    FunctionName      string `json:"functionName,omitempty"`
}
Enter fullscreen mode Exit fullscreen mode

Creating our wallet watcher using goroutines

Still inside internal directory we will create another package named 'watcher', inside it we will create our service.go file where we will implement our watcher, fist we will create a function responsible for starting our service:

// StartWatcherService load from environment the data and start running goroutines to perform wallet watcher service.
func StartWatcherService() {

    err := godotenv.Load()
    if err != nil {
        log.Fatalln("Error loading .env file", err.Error())
    }

    var wfe [20]domain.Wallet
    var wallets []domain.Wallet

    for index := range wfe {
        wallet := domain.Wallet{
            Address:   os.Getenv("WATCHER_WALLET" + strconv.Itoa(index+1)),
            SecretKey: os.Getenv("WATCHER_SECRET" + strconv.Itoa(index+1)),
        }
        wallets = append(wallets, wallet)
    }
    // contains filtered fields or functions
}
Enter fullscreen mode Exit fullscreen mode

In the above code snippet we load our environment variables where we stored data that should not be exposed, in this example we have loaded 20 wallets and their respective private keys using the for range.

Still in our service.go, we will create an synchronization group for our go routines that will be created:

// StartWatcherService load from environment the data and start running go routines to perform wallet watcher service.
func StartWatcherService() {

    // contains filtered fields or functions

    // Create a wait group to synchronize go routines
    var wg sync.WaitGroup
    wg.Add(len(wallets))

    // contains filtered fields or functions
}
Enter fullscreen mode Exit fullscreen mode

Following, we will create our go routines:

// StartWatcherService load from environment the data and start running go routines to perform wallet watcher service.
func StartWatcherService() {
    // contains filtered fields or functions
    // Start a go routine for each wallet
    for _, wallet := range wallets {
        go func(wallet domain.Wallet) {
            // Connect to the Ethereum client
            client, err := rpc.Dial(os.Getenv("WATCHER_RPC_ADDRESS"))
            if err != nil {
                log.Printf("Failed to connect to the RPC client for address %s: %v \n Trying fallback rpc server...", wallet.Address.Hex(), err)
            }
            client, err = rpc.Dial(os.Getenv("WATCHER_RPC_FALLBACK_ADDRESS"))
            if err != nil {
                log.Printf("Failed to connect to the Ethereum client for address %s: %v", wallet.Address.Hex(), err)
                wg.Done()
                return
            }

            // Create an instance of the Ethereum client
            ethClient := ethclient.NewClient(client)

            for {
                // Get the balance of the address
                balance, err := ethClient.BalanceAt(context.Background(), common.HexToAddress(wallet.Address), nil)
                if err != nil {
                    log.Printf("Failed to get balance for address %s: %v", wallet.Address.Hex(), err)
                    continue
                }

                balanceInEther := new(big.Float).Quo(new(big.Float).SetInt(balance), big.NewFloat(1e18))

                log.Printf("Balance for address %s: %.16f ETH", wallet.Address.Hex(), balanceInEther)


                // if the wallet has a balance superior to 0.0005 ETH, we are sending the balance to another wallet
                if balanceInEther.Cmp(big.NewFloat(0.0005)) > 0 {
                    sendBalanceToAnotherWallet(common.HexToAddress(wallet.Address), balance, wallet.SecretKey)
                }

                time.Sleep(300 * time.Millisecond) // Wait for a while before checking for the next block
            }
        }(wallet)
    }
        // Wait for all go routines to finish
    wg.Wait()
}
Enter fullscreen mode Exit fullscreen mode

Finally, we will create our function responsible for generating and signing a transaction that will send the wallets' balance to another wallet:

// sendBalanceToAnotherWallet when find some values in any wallet perform a SendTransaction(ctx context.Context,
// tx *types.Transaction) function
func sendBalanceToAnotherWallet(fromAddress common.Address, balance *big.Int, privateKeyHex string) {
    toAddress := common.HexToAddress(os.Getenv("WATCHER_DEST_ADDRESS"))
    chainID := big.NewInt(1)

    // Connect to the Ethereum client
    client, err := rpc.Dial(os.Getenv("WATCHER_RPC_ADDRESS"))
    if err != nil {
        log.Printf("Failed to connect to the Ethereum client: %v...", err)
    }

    ethClient := ethclient.NewClient(client)

    // Load the private key
    privateKey, err := crypto.HexToECDSA(privateKeyHex[2:])
    if err != nil {
        log.Fatalf("Failed to load private key: %v", err)
    }

    // Get the current nonce for the fromAddress
    nonce, err := ethClient.PendingNonceAt(context.Background(), fromAddress)
    if err != nil {
        log.Printf("Failed to retrieve nonce: %v", err)
    }

    // Create a new transaction
    gasLimit := uint64(21000) // Here we define the transaction gas tax limit based up on his type.
    gasPrice, err := ethClient.SuggestGasPrice(context.Background())
    if err != nil {
        log.Printf("Failed to retrieve gas price: %v", err)
    }


    tx := types.NewTx(&types.LegacyTx{
        Nonce:    nonce,
        GasPrice: gasPrice,
        Gas:      gasLimit,
        To:       &toAddress,
        Value:    new(big.Int).Sub(balance, new(big.Int).Mul(gasPrice, big.NewInt(int64(gasLimit)))),
        Data:     nil,
    })
    valueInEther := new(big.Float).Quo(new(big.Float).SetInt(tx.Value()), big.NewFloat(1e18))
    if valueInEther.Cmp(big.NewFloat(0)) < 0 {
        log.Println("ERROR: Insufficient funds to make transfer")
    }

    // Sign the transaction
    signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), privateKey)
    if err != nil {
        log.Printf("Failed to sign transaction: %v", err)
    }

    // Send the signed transaction
    err = ethClient.SendTransaction(context.Background(), signedTx)
    if err != nil {
        log.Printf("Failed to send transaction: %v", err)
    } else {
        log.Printf("Transaction sent: %s", signedTx.Hash().Hex())
    }
}
Enter fullscreen mode Exit fullscreen mode

Our wallet watcher service is ready, our watcher/service.go should be like:

At this point, if we don't want to create an API Rest, we need just call our StartWatcherService() into our main.go

func main() {
    // filtered fields or functions

    // Start our watcher
    go watcher.StartWatcherService()

    // Wait for the server and the watcher service to finish
    select {}
}
Enter fullscreen mode Exit fullscreen mode

Exposing a Rest API to queries

We will use the Gin Web Framework to build a Rest API where we will expose an endpoint to querying a wallet's balance and recent transactions from an address. To do this we need to add o gin-gonic module to our project:

go get github.com/gin-gonic/gin
Enter fullscreen mode Exit fullscreen mode

Creating a service to our 'wallet' package

Now, inside internal we will create a 'wallet' package, in this package we will create a service.go file, this is where we will make calls to the Etherscan.io API to balance and transactions queries:

type Service interface {
    GetWalletBalanceByAddress(address string) (domain.Wallet, error)
    GetTransactionsByAddress(address, page, size string) (domain.Wallet, error)
}

type service struct{}

// NewService creates a new instance of the Wallet Service.
func NewService() Service {
    return &service{}
}
Enter fullscreen mode Exit fullscreen mode

First we will create a method to get informations from address that we will receive as param (we will see soon the gin handler function):

// GetWalletBalanceByAddress retrieves the wallet balance for the given address
func (s service) GetWalletBalanceByAddress(address string) (domain.Wallet, error) {

}
Enter fullscreen mode Exit fullscreen mode

Inside this we will get the Etherscan.io APIKey from our environment:

// GetWalletBalanceByAddress retrieves the wallet balance for the given address
func (s service) GetWalletBalanceByAddress(address string) (domain.Wallet, error) {
    // Retrieves Etherscan.io API Key from environment
    apiKey := os.Getenv("WATCHER_ETHERSCAN_API")
    url := fmt.Sprintf(fmt.Sprintf("https://api.etherscan.io/api?module=account&action=balance&address=%s&tag=latest&apikey=%s", address, apiKey))
    // Contains filtered fields or functions
}
Enter fullscreen mode Exit fullscreen mode

Following, we will make a HTTP GET cal to Etherscan API and read the response content:

// GetWalletBalanceByAddress retrieves the wallet balance for the given address
func (s service) GetWalletBalanceByAddress(address string) (domain.Wallet, error) {

    // Contains filtered fields or functions

    // Send GET request to the Etherscan API
    response, err := http.Get(url)
    if err != nil {
        log.Printf("Failed to make Etherscan API request: %v", err)
        return domain.Wallet{}, err
    }
    defer response.Body.Close()

    // Read the response body
    body, err := io.ReadAll(response.Body)
    if err != nil {
        log.Printf("Failed to read response body: %v", err)
        return domain.Wallet{}, err
    }

    // Creates a struct to represent etherscan API response
    var result struct {
        Status  string `json:"status"`
        Message string `json:"message"`
        Result  string `json:"result"`
    }

    // Contains filtered fields or functions
}
Enter fullscreen mode Exit fullscreen mode

Finally we will parse the Etherscan.io API response and structure it according with our struct, also we need to make some validations and return the data:

// GetWalletBalanceByAddress retrieves the wallet balance for the given address
func (s service) GetWalletBalanceByAddress(address string) (domain.Wallet, error) {

    // Contains filtered fields or functions

    // Parse the JSON response
    err = json.Unmarshal(body, &result)
    if err != nil {
        log.Printf("Failed to parse JSON response: %v", err)
        return domain.Wallet{}, err
    }

    if result.Status != "1" {
        log.Printf("API returned error: %s", result.Message)
        return domain.Wallet{}, fmt.Errorf("API error: %s", result.Message)
    }

    wbBigInt := new(big.Int)
    wbBigInt, ok := wbBigInt.SetString(result.Result, 10)
    if !ok {
        log.Println("Failed to parse string to BigInt")
        return domain.Wallet{}, fmt.Errorf("failed to parse string into BigInt. result.Result value: %s", result.Result)
    }

    wb := new(big.Float).Quo(new(big.Float).SetInt(wbBigInt), big.NewFloat(1e18))
    v, _ := strconv.ParseFloat(wb.String(), 64)

    return domain.Wallet{
        Address: address,
        Balance: v,
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

We already have our method to get balance information from a wallet address, now let's create another method in our wallet/service.go file to show the transactions, the logic will be the same as the previous method, the difference will be in how we map the Etherscan.io API response and how we will build the URL endpoint to make GET request, because we will have as parameters the page and the number of items per page beyond the address:

// GetTransactionsByAddress retrieves the wallet balance and last transactions for the given address paggeable
func (s service) GetTransactionsByAddress(address, page, size string) (domain.Wallet, error) {
    // Call to GetWalletBalanceByAddress to mount wallet with current balance.
    wallet, _ := s.GetWalletBalanceByAddress(address)
    apiKey := os.Getenv("WATCHER_ETHERSCAN_API")
    url := fmt.Sprintf("https://api.etherscan.io/api?module=account&action=txlist&address=%s&startblock=0&endblock=99999999&page=%s&offset=%s&sort=desc&apikey=%s", address, page, size, apiKey)

    // Contains filtered fields or functions

    // Parse the JSON response
    var transactions struct {
        Status  string               `json:"status"`
        Message string               `json:"message"`
        Result  []domain.Transaction `json:"result"`
    }

    // Add transactions to wallet struct
    wallet.Transactions = append(wallet.Transactions, transactions.Result...)

    return wallet, nil
}
Enter fullscreen mode Exit fullscreen mode

We finished our wallet/service.go and the entire file should be like the gist bellow:

Creating the gin handler functions to expose our API

Following we will create our gin handlerFunc to interact with our wallet/service.go and expose the necessary endpoints to querying wallet's balance and transactions from an Ethereum wallet.

We will create a directory in the our project root and mame it as cmd (here we will put our main.go and the handler package from our API, the directory structure must be like that:

.env.example
cmd
   |-- server
   |   |-- handler
   |   |   |-- wallet.go
   |   |-- main.go
internal
   |-- domain
   |   |-- transaction.go
   |   |-- wallet.go
   |-- wallet
   |   |-- dto.go
   |   |-- service.go
   |-- watcher
   |   |-- service.go
pkg
   |-- web
   |   |-- response.go
Enter fullscreen mode Exit fullscreen mode

Finally we will create our handler package, inside it we will create a wallet.go file:

package handler

type walletHandler struct {
    s wallet.Service
}

// NewWalletHandler creates a new instance of the Wallet Handler.
func NewWalletHandler(s wallet.Service) *walletHandler {
    return &walletHandler{s: s}
}
Enter fullscreen mode Exit fullscreen mode

Inside this file we will create two handler functions: GetWalletByAddress() and GetTransactionsByAddress() to show wallet's balance and transactions:

// GetWalletByAddress get wallet info balance from a given address
func (h *walletHandler) GetWalletByAddress() gin.HandlerFunc {
    return func(ctx *gin.Context) {
        ap := ctx.Param("address")
        w, err := h.s.GetWalletBalanceByAddress(ap)
        if err != nil {
            web.BadResponse(ctx, http.StatusBadRequest, "error", err.Error())
            return
        }
        web.OKResponse(ctx, http.StatusOK, w)
    }
}
Enter fullscreen mode Exit fullscreen mode
// GetTransactionsByAddress retrieves up to 10000 transactions by given adrress in a paggeable response
func (h *walletHandler) GetTransactionsByAddress() gin.HandlerFunc {
    return func(ctx *gin.Context) {
        address := ctx.Param("address")
        page := ctx.Query("page")
        size := ctx.Query("pageSize")

        if len(page) == 0 {
            page = "1"
        }
        if len(size) == 0 {
            size = "10"
        }

        if _, err := strconv.Atoi(page); err != nil {
            web.BadResponse(ctx, http.StatusBadRequest, "error", fmt.Sprintf("Invalid page param. Verify page value: %s", page))
            return
        }

        if _, err := strconv.Atoi(size); err != nil {
            web.BadResponse(ctx, http.StatusBadRequest, "error", fmt.Sprintf("Invalid pageSize param. Verify pageSize value: %s", size))
            return
        }

        response, err := h.s.GetTransactionsByAddress(address, page, size)
        if err != nil {
            web.BadResponse(ctx, http.StatusBadRequest, "error", err.Error())
            return
        }

        var pageableResponse struct {
            Page  string      `json:"page"`
            Items string      `json:"items"`
            Data  interface{} `json:"data"`
        }

        pageableResponse.Page = page
        pageableResponse.Items = size
        pageableResponse.Data = response

        web.OKResponse(ctx, http.StatusOK, pageableResponse)
    }
}
Enter fullscreen mode Exit fullscreen mode

Back to concurrent programming, inside our main.go file we will instantiate our walletService and walletHandler, create a gin server and instantiate it inside a goroutine:

func main() {

    wService := wallet.NewService()
    wHandler := handler.NewWalletHandler(wService)

    r := gin.New()
    r.Use(gin.Recovery(), gin.Logger())

    r.GET("/", func(c *gin.Context) {
        c.JSON(http.StatusOK, gin.H{
            "message": "Everything is okay here",
        })
    })

    api := r.Group("/api/v1")
    {
        ethNet := api.Group("/eth/wallets")
        {
            ethNet.GET(":address", wHandler.GetWalletByAddress())
            ethNet.GET(":address/transactions", wHandler.GetTransactionsByAddress())
        }
    }

    // Start the Gin server in a goroutine
    go func() {
        if err := r.Run(":8080"); err != nil {
            log.Println("ERROR IN GONIC: ", err.Error())
        }
    }()

    // Contains filtered fields or functions
}
Enter fullscreen mode Exit fullscreen mode

We need start our gin server inside a goroutine to our API and our wallet watcher service run simultaneous, so finally, we will start inside another goroutine our watcher service:

func main() {
    // Filtered fields or functions

    // Start our watcher
    go watcher.StartWatcherService()

    // Wait for the server and the watcher service to finish
    select {}
}
Enter fullscreen mode Exit fullscreen mode

We finalize the implementation of our API and watcher service, as we want the watcher related goroutines to keep running while our application ins running the select{} will wait for their completion, when we created our goroutine for each one of the wallets we include a 'for' with no exit clause, it will be responsible for preventing our goroutines from being completed after their first execution:

//watcher/service.go

func StartWatcherService() {
    // Contains filtered fiels or functions

        go func(wallet domain.Wallet) {
            for {
                // Get the balance of the address
                balance, err := ethClient.BalanceAt(context.Background(), common.HexToAddress(wallet.Address), nil)

                // Contains filtered fields or functions

                time.Sleep(300 * time.Millisecond) // Wait for a while before checking for the next block

                // Contains filtered fields or functions
                }
        }
        // Waiting for all routines to finish - will not cause for is running in an infinity loop.
        wg.Wait()

        // Contains filtered fields or functions
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

The concurrent programming is a powerful tool in the software development, in this article/tutorial step by step we saw how implement it and extract their best in a Ethereum wallet watcher, exchange the 20 wallets for thousands, or the thousands of wallets for thousands of messaging queues or data streams, the go scheduler will take care and use efficiently the available resources. In our example, the 20 goroutines running simultaneous every 300ms consumed 63MB from available memory and the CPU utilization was 6%, this service was running inside a shared instance from 256Mb from Fly.io free tier:
The memory consume
The CPU utilization

I hope that this article has been useful and has helped you to better understand concurrent programming and goroutines in Go.

Top comments (0)