Emulate Google Cloud Tasks in NestJs and Nx

Scheduling social media posts is at the heart of Social Sprinkler's functionality. It needed a service that gave control to the publisher to execute tasks. Enter Google Cloud Tasks.

Development became an obstacle because there's no emulator support like there is for the Google Cloud's Pub / Sub product. Publishing directly to the Cloud Tasks wasn't optimal because the full breadth of the behavior from publishing to execution couldn't be monitored. So what options are available for emulating Google Cloud Tasks?

Overview

This tutorial is for developers using Node / NestJs and Google Cloud Tasks. Familiarity with Nx tools is helpful but not required.

Before You Begin

Clone the NgServe.io sample repository for a full example.

Install the packages

npm install --legacy-peer-deps

Ensure Go is installed https://go.dev/dl/

Goals

The end state should be able create an HTTP targeted Cloud Task in a local environment.

1. Build and Add Cloud Task Emulator

2. Create the Cloud Tasks Service

3. Publish and Handle Cloud Task Requests

4. Configure the Emulator to Run with Nx Task

5. Test the Publisher

Build and Add Cloud Task Emulator

The Cloud Task emulator was created as an open source project written in Go. Clone the repository below.

GitHub - aertje/cloud-tasks-emulator: Google cloud tasks emulator
Google cloud tasks emulator. Contribute to aertje/cloud-tasks-emulator development by creating an account on GitHub.

Build the emulator from the directory the where repository exists.

go build

Copy the cloud-tasks-emulator to the tools/emulators directory of the NgServe.io sample repository.

NOTE - The cloud-tasks-emulator may differ on output given your operating system. The emulator that exists in the sample repository was built on a Mac.

Cloud Tasks Emulator in the NgServe.io tools/emulators directory

In the directory of the NgServe.io repository, run the command to start the Cloud Tasks Emulator.

This starts a single Cloud Task queue on localhost:8123. Multiple queues could be started adding another -queue option.

With the queues running, how does the queue get configured for local development in code?

Create the Cloud Tasks Service

With the queue running, the @google-cloud/tasks package eases the pain of connecting to the available queues.

If working the ngserve.io sample repository, this package has been added.

npm install @google-cloud/tasks --save

Webhook Service Interfaces / Types

More than one type of a webhook type of service may exist, so genericized types exist to better meet future implementations. A WebhookType<T> is a template for the request being made. The IWebhookService applies to a service that will make a request to an external service.

Google Cloud Task requires configuration for project, location, and queue. host and port are optional parameters giving flexibility and allowing the code to determine which environment it's running.

The CloudTaskType<T> includes properties unique to Google Cloud Tasks and extends the WebhookType<T> type. Cloud Tasks give the publisher control when items in the queue are pushed. The scheduleTime property gives that control. If no scheduledTime is provided, the task runs immediately.

With the models in place, what's the objective of the CloudTaskService?

Create a Service to Manage the Queue

The CloudTaskService creates and configures a new CloudTaskClient. Once created tasks can be created and removed from the Cloud Task queue.

For local development, the import part to take note of is in the constructor. When working inside of Google Cloud's infrastructure, service accounts need to be configured for the Cloud Task publisher role. The service makes the assumption when the host and port are defined the service is executing in a local environment.

Notice the CloudTask service does not include the @Injectable() decorator. The service does not get injected anywhere. Multiple queues could exist for different purposes. Social Sprinkler uses different queues for publishing social posts, media included posts, and email.

How does Social Sprinkler configure the queue?

Instantiating the CloudTaskService

Specific queue services are created through the use of an injection token and a service factory. The responsibility of the service factory is to instantiate the CloudTaskService given a unique configuration for the queue.

The CloudTaskFactory returns a CloudTaskService unique to the IGoogleCloudTaskConfiguration provided. It stores the clients in a map as to not reinstantiate using a unique key of location_project_queue.

Injection tokens help configure and inject the correct queue to the consuming service.

The PUBLISHER_QUEUE injection key uses the ConfigService and the CloudTaskFactory to create a unique client for publishing posts.

The PublisherService provides the public API for the consumers to publish to the PUBLISHER_QUEUE.

So how do tasks get published?

Publish and Handle Cloud Task Requests

The publisher controller will create a queued task and the handler will process the request from the queue. When handling requests from the queue, Google Cloud sends specific headers to help validate the request origin of the Cloud Task Queue. In Social Sprinkler's instance, Cloud Tasks invoke internally by a permissioned service account with Cloud Task Invoker role.

The PublisherController opens the ability to publish to the task queue. It passes along an id serves as the unique identifier in Cloud Tasks.

The HandlerController is nothing special in this case. Validating the request won't be covered by this example.

The publish and handling are in place, how does local development start the emulator alongside serving the publish requests?

Configure the Emulator to Run with Nx Task

When using an Nx generated NestJs application, a project.json allows custom configuration of tasks to be executed against project code through executors.

How to Write an Nx Executor for Building Docker Images
Learn how to write an Nx Executor for building Docker Images in an Nx Workspace

The serve task for a NestJs application typically uses the @nx/js:node executor to start the application. Spinning up the dependent Cloud Task emulator requires more configuration. The serve task will remain, but another task serve-app replaces as the task that starts the app. serve executes the serve-app task and starts the emulator using the nx:run-commands executor.

Test the Publisher

Run the command below to start the application.

npx nx serve cloud-task-example-api
The Emulator Running and Service

Make a POST request to http://localhost:3000/api/publisher with the body { "id": "test-id-1234" }. The request returns the Cloud Task id associated for the message that's queued. Since there's no scheduleTime in the example, so the request will execute immediately.

The Task Handled and the task being marked as complete

Do we warned. When I had a bug in my code for Social Sprinkler that connects to YouTube's Data API. The video uploaded successfully but a validation error kept occurring while saving to the database. The response back to the emulator was an error code. The request kept trying until the YouTube API throttled the requests and started denying from a lack of credits. I'm unsure of how to set up the max tries for the emulator. Since the the example here is for HTTP tasks, it could be resolved by using App Engine as the Task handler and configuring the max retries within code.

Conclusion

Google Cloud Tasks gives the publisher control of when messages should get processed, but it doesn't ship with an emulator for local testing.

Setting up the Cloud Task emulator requires a non-standard Go application to mimic queues. Creating Tasks directly to the queue could cause request handlers to run unexpectedly.

The build of the Cloud Task Emulator runs in combination with a service's ability to publish to the queue. This is done through a Nx task using the nx:run-commands executor running the Emulator and service in parallel.