Dragonfly and Celery: Powering Financial Transactions

Learn how Dragonfly and Celery seamlessly integrate to build a financial transaction system based on Ethereum, providing a robust solution for efficient, high-volume data processing.

May 14, 2024

Dragonfly and Celery: Powering Financial Transactions

Introduction

As many of our readers already know, Dragonfly is a state-of-the-art, multi-threaded, high-performance drop-in replacement for Redis. As the next-generation in-memory data store, Dragonfly promises not only to accelerate data operations but also to seamlessly integrate with the existing frameworks and ecosystems that traditionally relied on Redis.

We have successfully integrated Dragonfly with various task queues and background job processing systems, including BullMQ for Node.js and Sidekiq for Ruby. These integrations demonstrate Dragonfly's superior compatibility and performance. In this post, I will showcase how Dragonfly can be integrated with Celery, a distributed task queue popular within the Python community. Notably, while Celery is primarily written in Python, it also supports client libraries in various other languages.

Celery is highly versatile, supporting a variety of message brokers and backends like RabbitMQ, Redis, Amazon SQS, and Zookeeper. Its capability to utilize Redis both as a message broker and as backend storage also makes Dragonfly an ideal choice for handling more demanding Celery workloads.

This blog post embarks on a journey to build a sophisticated transaction consolidation system around the Ethereum blockchain. Join us as we discuss the technical setup, challenges, and solutions involved in integrating Dragonfly with Celery to manage Ethereum blockchain transactions efficiently. All code snippets and examples discussed here can be found in our dragonfly-examples repository.


Ethereum Blockchain as a Payment Transaction API

When venturing into the realm of blockchain technology, it's easy to get overwhelmed by many of the concepts and terminologies involved. However, for the purposes of our example project, a deep dive into the Ethereum blockchain is not necessary. Instead, we can think of Ethereum somewhat simplistically—as a robust payment or banking transaction API.

In traditional banking systems, transactions are processed and verified by the bank itself. However, transactions on Ethereum, much like those in typical banking APIs, are not confirmed instantaneously. Each transaction is added to a block and must undergo a verification process by the network before it is finalized. This means we need to periodically check the status of each transaction until it is fully consolidated. This latency in transaction confirmation is a critical factor that our example project's consolidation system must manage correctly and efficiently.

By treating Ethereum as a payment API, we simplify the interaction model but still handle the complexities of asynchronous transaction verifications. This approach allows us to focus on building a system using Dragonfly and Celery to manage and streamline these transactions efficiently, enhancing the user experience by reducing wait times and increasing reliability.


Transaction Submission & Consolidation Service

Enough context—let's dive into the technical details of our example project.

Database Schema

A well-designed database schema is crucial for managing and processing transactions in a system handling blockchain interactions like Ethereum, or frankly any premium application. For the purposes of this blog post, we have opted for simplicity while maintaining a sound design. Our schema includes two primary tables: user_accounts and user_account_transactions.

-- SQLite
CREATE TABLE user_accounts
(
    id                       INTEGER PRIMARY KEY,
    available_balance_in_wei INTEGER NOT NULL,
    current_balance_in_wei   INTEGER NOT NULL
);

The user_accounts table stores the user's account information, including their available and current balances. The available balance refers to the amount of funds that can be immediately withdrawn, while the current balance includes pending transactions that have not yet been consolidated.

-- SQLite
CREATE TABLE user_account_transactions
(
    id                                INTEGER PRIMARY KEY,
    user_account_id                   INTEGER NOT NULL,
    transaction_hash                  TEXT    NOT NULL,
    from_public_address               TEXT    NOT NULL,
    to_public_address                 TEXT    NOT NULL,
    transaction_amount_in_wei         INTEGER NOT NULL,
    transaction_fee_total_in_wei      INTEGER NOT NULL,
    transaction_fee_blockchain_in_wei INTEGER NOT NULL,
    status                            TEXT    NOT NULL DEFAULT 'PENDING' CHECK (status IN ('PENDING', 'SUCCESSFUL', 'FAILED')),

    FOREIGN KEY (user_account_id) REFERENCES user_accounts (id)
);

The user_account_transactions table stores the details of each transaction, including the sender and recipient addresses, the transaction amount, and the transaction fee. An address is a unique identifier for an account on the Ethereum blockchain, and the amounts are denoted in Wei, the smallest unit of the Ethereum cryptocurrency. Whereas Ether is the main unit of currency on the Ethereum blockchain, unlike normal currencies, which mostly have two decimal places, Ethereum has 18 decimal places, which means that 1 Ether = 10^18 Wei. We store the amounts in Wei as integers to avoid floating-point precision issues. If your database supports it, you can also use the NUMERIC or DECIMAL data types to store these amounts. Finally, the status field indicates the current state of the transaction, which can be PENDING, SUCCESSFUL, or FAILED.

Celery Task Configuration

Once a transaction is submitted to the Ethereum blockchain, we deduct the transaction amount from the user's available balance. Note that the user's current balance remains unchanged until the transaction is confirmed on the blockchain. A new transaction record is also created in the user_account_transactions table with the status set to PENDING.

To ensure our transaction consolidation system is both robust and resilient, we utilize Celery's task queue capabilities to manage the asynchronous checking and updating of transaction statuses. Here, we define a critical component: the Celery task responsible for reconciling transactions on the Ethereum blockchain. Although this is a specific example, the process can be similar to the reconciliation or settlement processes found in many financial systems.

from celery import Celery

# Celery app configuration.
# Use Dragonfly as the message broker and the result storage backend for Celery.
app = Celery(
    'tasks',
    broker=get_constants().get_celery_broker_url(),   # Dragonfly connection string.
    backend=get_constants().get_celery_backend_url(), # Dragonfly connection string.
)

As shown above, we first configure the Celery application to use Dragonfly as both the message broker and the result storage backend. Since Dragonfly is wire-compatible with Redis, ensuring seamless operation simply requires the correct connection string for Celery to work smoothly with Dragonfly.

from typing import Final

# Constants.
TASK_MAX_RETRIES: Final[int] = 32
TASK_RETRY_BACKOFF: Final[int] = 100
TASK_RETRY_BACKOFF_MAX: Final[int] = 1600

# Define and configure a Celery task to reconcile a transaction.
@app.task(
    bind=True,
    autoretry_for=(TaskRetryException,),
    max_retries=TASK_MAX_RETRIES,
    retry_backoff=TASK_RETRY_BACKOFF,
    retry_backoff_max=TASK_RETRY_BACKOFF_MAX,
    retry_jitter=False,
)
def reconcile_transaction(_self, txn_id: str):
    # Implementation details omitted for brevity

Next, we define the reconcile_transaction Celery task, which is responsible for checking the status of a transaction on the Ethereum blockchain. Celery tasks are very flexible and can be configured with various options. In this example, we set the maximum number of retries to 32 and the delay between retries to 100 seconds. Note that the retry delay is exponential, starting at 100 seconds and doubling with each retry. We also set the maximum backoff delay to 1600 seconds, so the retry delays in this configuration are capped, resulting in a sequence of [100, 200, 400, 800, 1600, 1600, ...] seconds. This should be sufficient for the example use case, but you can adjust these values based on your specific requirements.

To learn more about Celery task configuration options, please refer to this documentation page.

Putting It All Together

With our database schema designed and our Celery task configured, we're now ready to outline how these elements integrate within our API to create the transaction service. The following code snippet illustrates the API endpoint responsible for initiating transactions and notifying Celery to reconcile them:

@api_app.post("/transactions")
async def create_transaction(
        req: utils.TransactionRequest,
        db: Session = Depends(get_deps().get_db_session),
        df: Dragonfly = Depends(get_deps().get_dragonfly),
        w3: Web3 = Depends(get_deps().get_web3),
) -> utils.TransactionResponse:
    # Step 1: Record the transaction in the database and send it to the blockchain.
    # Step 2: Send a task to Celery to reconcile the transaction status.
    # Return the transaction response to the caller.

The overall workflow of sending and reconciling a transaction is as follows:

Transaction Service Flow

The post("/transactions") API endpoint defined above handles the initial steps of the workflow: recording the transaction in the database and sending it to the blockchain. After these actions, it sends a task to Celery to reconcile the transaction status.

Once the transaction is successfully submitted to the blockchain, the reconcile_transaction Celery task is activated. This task manages the remainder of the workflow, primarily by checking the transaction status on the Ethereum blockchain. Given that immediate confirmation is unlikely, the task will repeatedly check the status based on the configurations we've set. Ultimately, the transaction will be marked as successful or failed, and the task will update both the transaction status and the user's current balance in the database.

Despite omitting some implementation details for brevity, the workflow of our transaction consolidation system is straightforward. Utilizing Celery's retry mechanism provides a natural and efficient method for managing the reconciliation of blockchain transactions. This approach adeptly handles the uncertainties and delays inherent in blockchain operations, ensuring transactions are resolved without overwhelming the system. Additionally, the seamless integration of Dragonfly with Celery showcases their robust compatibility. For more implementation details we've skipped here for brevity, check out the full example code in our dragonfly-examples repository.


Finishing Touches

Before we conclude, let's enhance our example project further to improve its functionality and user experience with Dragonfly.

Dragonfly as a Lock Manager

Financial data is extremely sensitive and requires strong measures to maintain its integrity and security. A key challenge in handling financial transactions is preventing double-spending, where the same funds are transferred more than once. This issue often arises when a user attempts multiple transactions in rapid succession.

Ideally, we should manage our database operations in a way that serializes them and prevents concurrent access. However, for critical API calls like transaction submissions, it doesn't hurt to have an additional layer of protection on top of that. To do so, we can use Dragonfly as a distributed lock manager by leveraging the classic SET command with NX and EX options:

  • NX ensures that the lock is set only if it does not already exist, preventing multiple locks on the same account.
  • EX sets an expiration on the lock, ensuring it does not remain indefinitely and potentially block user accounts for too long.
Dragonfly as a Lock Manager

Note that the lock key is unique to each user account. And since the whole SET operation is atomic, we can be confident that only one transaction is processed at a time for each user account. We can add this lock mechanism to our post("/transactions") API endpoint:

@api_app.post("/transactions")
async def create_transaction(
        req: utils.TransactionRequest,
        db: Session = Depends(get_deps().get_db_session),
        df: Dragonfly = Depends(get_deps().get_dragonfly),
        w3: Web3 = Depends(get_deps().get_web3),
) -> utils.TransactionResponse:
    # Step 0: Try to acquire a lock on the user account.
    lock_key = utils.user_account_lock_key(req.user_account_id)
    locked = df.set(
        name=lock_key, value=utils.LOCK_VALUE,
        nx=True, ex=utils.LOCK_EXPIRATION_SECONDS,
    )

    if not locked:
        raise HTTPException(
            status_code=409,
            detail="User account is locked since a transaction is submitted very recently. Please try again later.",
        )

    # Step 1: Record the transaction in the database and send it to the blockchain.
    # Step 2: Send a task to Celery to reconcile the transaction status.
    # Return the transaction response to the caller.

If you are very paranoid or your system naturally requires even more stringent locking mechanisms, the more sophisticated Redlock algorithm can be used to acquire distributed locks across multiple Dragonfly instances as well.

Dragonfly as a Caching Layer

Last but not least, Dragonfly, as a cutting-edge in-memory data store, is ideally suited for acting as a caching layer to speed up our API responses. In this particular example, users tend to check their recent transactions frequently, as they could be eagerly waiting for transaction status updates. Below is an example of how we can cache the transaction details in Dragonfly to reduce the load on the database for the get("/transactions/{txn_id}") endpoint:

@api_app.get("/transactions/{txn_id}")
async def get_transaction(
        txn_id: int,
        db: Session = Depends(get_deps().get_db_session),
        df: Dragonfly = Depends(get_deps().get_dragonfly),
) -> utils.TransactionResponse:
    cache_key = utils.txn_cache_key(txn_id)

    # Try to read the transaction from Dragonfly first.
    cached_txn = df.hgetall(cache_key)

    # Empty cache value with only the ID.
    if len(cached_txn) == 1:
        raise HTTPException(status_code=404, detail="Transaction not found")
    # Cache hit.
    elif len(cached_txn) > 1:
        return utils.txn_dict_to_response(cached_txn)

    # Cache miss, read from the database.
    txn = db.query(models.UserAccountTransaction).get(txn_id)

    # If the transaction is not found, cache an empty value with only the ID and return a 404.
    # Caching an empty value is important to prevent cache penetrations.
    if txn is None:
        utils.hset_and_expire(df, cache_key, {"id": txn_id}, utils.CACHE_EMPTY_EXPIRATION_SECONDS)
        raise HTTPException(status_code=404, detail="Transaction not found")

    # Cache the transaction in Dragonfly and return the response.
    mapping = utils.txn_to_dict(txn)
    utils.hset_and_expire(df, cache_key, mapping, utils.CACHE_NORMAL_EXPIRATION_SECONDS)
    return utils.txn_to_response(txn)

Conclusion

Throughout this post, we've illustrated the seamless integration of Celery with Dragonfly, underscoring Dragonfly's role as a superior drop-in replacement for Redis and highlighting Celery's robust task management capabilities. Our example application showcases Dragonfly in three distinct roles:

  • Lock Manager: Safeguarding against concurrent transaction submissions.
  • Cache Layer: Boosting system responsiveness by caching transaction details.
  • Backend for Celery: Facilitating reliable task queue management with efficient retry and backoff strategies. Once configured, tasks are autopiloted by Celery, ensuring that transactions are reconciled efficiently and reliably.

These use cases demonstrate Dragonfly's versatility, making it an essential component in our system architecture. By leveraging Dragonfly and Celery, we enhance operational efficiency and reliability, proving their abilities to manage sophisticated data processing tasks.

Ready to elevate your data management solutions? Explore Dragonfly's features and see how it can transform your applications. Dive deeper into Dragonfly and connect with other developers by joining our Discord community.

Stay up to date on all things Dragonfly

Subscribe to receive a monthly newsletter with new content, product announcements, events info, and more!

Start building today

Dragonfly is fully compatible with the Redis ecosystem and requires no code changes to implement.