How to Set Up an Asynchronous Worker

1. Objective:

Describe the steps to set up an asynchronous worker in a project. The worker will get observe a directory for new files and process them asynchronously.

This can be used to process files in the background, e.g. processing images uploaded by users.

2. Steps:

Step 1: Create a new worker from template.

Create a new worker from the template provided in the project. The worker should be able to observe a directory for new files and process them asynchronously.

Step 2: Define the worker's configuration.

Set up the number of concurrent tasks that can be processed as well as the number of asynchronous workers that can be spawned. Currently, the parameters are hardcoded in the worker script - refactor the code to read these values from a configuration file.

Modify the following code:

NUM_CONCURRENT_TASKS = 3
NUM_WORKERS = 5
DIRECTORY_TO_WATCH = "./watched_directory"
OUTPUT_DIRECTORY = "./processed_files"

to read the values from a configuration file.

Step 3: Implement the worker's processing logic.

Implement the worker's processing logic. The worker should take a file from the watched as input, do a task based on the file, and then create a result file to the output directory.

In the template, the worker is processing the file by simply waiting for a random amount of time and then renaming the file.

Rewrite the following code to implement the processing logic:

async def task_coroutine(file_path: str, output_directory: str):
    """Process a file asynchronously.

    Args:
        - file_path: The path to the file to process.
        - output_directory: The directory to move the processed file to
    """

    # Process the file - this could be IO-bound or CPU-bound work
    sleep_time = random.uniform(3, 7)
    print(f"Processing file: {file_path}, will take {sleep_time:.2f} seconds.")
    await asyncio.sleep(sleep_time)
    print(f"Completed processing file: {file_path}.")

    # Move the file to the output directory
    os.rename(file_path, f"{output_directory}/{os.path.basename(file_path)}")

Make sure that the the coroutine uses async I/O operations to avoid blocking the event loop.

Setp 3: Implement any other modifications to the worker.

Implement any other modifications to the worker that are necessary for the project.

For example:

  • add a database connection to the worker.
  • setup more detailed logging.

Step 4: Test the worker.

Test the worker by running it and observing the output. The worker should process files asynchronously and create the result files in the output directory.

Step 5: Deploy the worker.

Deploy the worker to the production environment.

Step 5.1: Set up the worker as a systemd service.

On Ubuntu, you can set up the worker as a systemd service by creating a service file in /etc/systemd/system/:

For exameple, create a file named async_worker.service with the following content:

[Unit]
Description=Asynchronous Worker
After=network.target

[Service]
User=<username>
WorkingDirectory=/path/to/worker
ExecStart=/path/to/venv/bin/python worker.py --config ./relative/path/to/config.yaml
Restart=always

[Install]
WantedBy=multi-user.target

3. Additional Information:

  • The worker should be able to handle errors gracefully and log them.
  • The worker should be able to recover from failures and continue processing files.