Emulate Google Cloud Tasks in NestJs and Nx
Scheduling social media posts is at the heart of Social Sprinkler's functionality. It needed a service that gave control to the publisher to execute tasks. Enter Google Cloud Tasks.
Development became an obstacle because there's no emulator support like there is for the Google Cloud's Pub / Sub product. Publishing directly to the Cloud Tasks wasn't optimal because the full breadth of the behavior from publishing to execution couldn't be monitored. So what options are available for emulating Google Cloud Tasks?
Overview
This tutorial is for developers using Node / NestJs and Google Cloud Tasks. Familiarity with Nx tools is helpful but not required.
Before You Begin
Clone the NgServe.io sample repository for a full example.
Install the packages
npm install --legacy-peer-deps
Ensure Go is installed https://go.dev/dl/
Goals
The end state should be able create an HTTP targeted Cloud Task in a local environment.
1. Build and Add Cloud Task Emulator
2. Create the Cloud Tasks Service
3. Publish and Handle Cloud Task Requests
4. Configure the Emulator to Run with Nx Task
5. Test the Publisher
Build and Add Cloud Task Emulator
The Cloud Task emulator was created as an open source project written in Go. Clone the repository below.
Build the emulator from the directory the where repository exists.
go build
Copy the cloud-tasks-emulator
to the tools/emulators directory of the NgServe.io sample repository.
NOTE - The cloud-tasks-emulator
may differ on output given your operating system. The emulator that exists in the sample repository was built on a Mac.
In the directory of the NgServe.io repository, run the command to start the Cloud Tasks Emulator.
This starts a single Cloud Task queue on localhost:8123
. Multiple queues could be started adding another -queue
option.
With the queues running, how does the queue get configured for local development in code?
Create the Cloud Tasks Service
With the queue running, the @google-cloud/tasks package eases the pain of connecting to the available queues.
If working the ngserve.io sample repository, this package has been added.
npm install @google-cloud/tasks --save
Webhook Service Interfaces / Types
More than one type of a webhook type of service may exist, so genericized types exist to better meet future implementations. A WebhookType<T>
is a template for the request being made. The IWebhookService
applies to a service that will make a request to an external service.
Google Cloud Task requires configuration for project
, location
, and queue
. host
and port
are optional parameters giving flexibility and allowing the code to determine which environment it's running.
The CloudTaskType<T>
includes properties unique to Google Cloud Tasks and extends the WebhookType<T>
type. Cloud Tasks give the publisher control when items in the queue are pushed. The scheduleTime
property gives that control. If no scheduledTime
is provided, the task runs immediately.
With the models in place, what's the objective of the CloudTaskService
?
Create a Service to Manage the Queue
The CloudTaskService
creates and configures a new CloudTaskClient
. Once created tasks can be created and removed from the Cloud Task queue.
For local development, the import part to take note of is in the constructor
. When working inside of Google Cloud's infrastructure, service accounts need to be configured for the Cloud Task publisher role. The service makes the assumption when the host
and port
are defined the service is executing in a local environment.
Notice the CloudTask
service does not include the @Injectable()
decorator. The service does not get injected anywhere. Multiple queues could exist for different purposes. Social Sprinkler uses different queues for publishing social posts, media included posts, and email.
How does Social Sprinkler configure the queue?
Instantiating the CloudTaskService
Specific queue services are created through the use of an injection token and a service factory. The responsibility of the service factory is to instantiate the CloudTaskService
given a unique configuration for the queue.
The CloudTaskFactory
returns a CloudTaskService
unique to the IGoogleCloudTaskConfiguration
provided. It stores the clients in a map as to not reinstantiate using a unique key of location_project_queue
.
Injection tokens help configure and inject the correct queue to the consuming service.
The PUBLISHER_QUEUE
injection key uses the ConfigService
and the CloudTaskFactory
to create a unique client for publishing posts.
The PublisherService
provides the public API for the consumers to publish to the PUBLISHER_QUEUE
.
So how do tasks get published?
Publish and Handle Cloud Task Requests
The publisher controller will create a queued task and the handler will process the request from the queue. When handling requests from the queue, Google Cloud sends specific headers to help validate the request origin of the Cloud Task Queue. In Social Sprinkler's instance, Cloud Tasks invoke internally by a permissioned service account with Cloud Task Invoker role.
The PublisherController
opens the ability to publish to the task queue. It passes along an id
serves as the unique identifier in Cloud Tasks.
The HandlerController
is nothing special in this case. Validating the request won't be covered by this example.
The publish and handling are in place, how does local development start the emulator alongside serving the publish requests?
Configure the Emulator to Run with Nx Task
When using an Nx generated NestJs application, a project.json
allows custom configuration of tasks to be executed against project code through executors.
The serve
task for a NestJs application typically uses the @nx/js:node
executor to start the application. Spinning up the dependent Cloud Task emulator requires more configuration. The serve
task will remain, but another task serve-app
replaces as the task that starts the app. serve
executes the serve-app
task and starts the emulator using the nx:run-commands executor.
Test the Publisher
Run the command below to start the application.
npx nx serve cloud-task-example-api
Make a POST request to http://localhost:3000/api/publisher
with the body { "id": "test-id-1234" }
. The request returns the Cloud Task id associated for the message that's queued. Since there's no scheduleTime
in the example, so the request will execute immediately.
Do we warned. When I had a bug in my code for Social Sprinkler that connects to YouTube's Data API. The video uploaded successfully but a validation error kept occurring while saving to the database. The response back to the emulator was an error code. The request kept trying until the YouTube API throttled the requests and started denying from a lack of credits. I'm unsure of how to set up the max tries for the emulator. Since the the example here is for HTTP tasks, it could be resolved by using App Engine as the Task handler and configuring the max retries within code.
Conclusion
Google Cloud Tasks gives the publisher control of when messages should get processed, but it doesn't ship with an emulator for local testing.
Setting up the Cloud Task emulator requires a non-standard Go application to mimic queues. Creating Tasks directly to the queue could cause request handlers to run unexpectedly.
The build of the Cloud Task Emulator runs in combination with a service's ability to publish to the queue. This is done through a Nx task using the nx:run-commands
executor running the Emulator and service in parallel.