Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parametrized queue names #2079

Open
1 task done
akwodkiewicz opened this issue Apr 26, 2024 · 1 comment
Open
1 task done

Parametrized queue names #2079

akwodkiewicz opened this issue Apr 26, 2024 · 1 comment
Labels

Comments

@akwodkiewicz
Copy link

akwodkiewicz commented Apr 26, 2024

Is there an existing issue that is already proposing this?

  • I have searched the existing issues

Is your feature request related to a problem? Please describe it

I'm trying to create a basic module that serves as a BullMQ infra layer. The whole premise of the module is to create a facade over BullMQ API to create messages and to register message consumers. The design of the feature is for the module's main provider class to create a single "master queue" on BullMQ and register itself as its Worker/Processor. "Processing" the messages would be just passing the messages to the actual, previously registered consumers of particular message types.

Now, since I want to reuse the same module in various apps in the monorepo, I need to be able to parametrize the "master queue" name, so that several apps that use the same Redis instance, will not read from/write to the same queue.

I think this is not possible at the moment with the @nestjs/bullmq declarative API (@InjectQueue/@Processor).

Injecting a parametrized queue

Although I can @Inject a custom string value (like a queue name) to the provider, I cannot use it inside the @InjectQueue() decorator (@InjectQueue(@Inject(QUEUE_NAME)) <- not possible).

However, I was able to overcome this by cutting the middle man, and instead of using the @InjectQueue decorator, I used the getQueueToken. This let me inject the queue myself in my module. I had to alias the "native" queue token with a custom const token using useExisting, so I could then reference the const token in the @Inject decorator in the provider:

/// my.module.ts
class MyModule {
  static forRoot(customQueueName: string) {
    return {
        ...
        provide: [
          {
            provide: 'MY_CUSTOM_QUEUE_DI_TOKEN',
            useExisting: getQueueToken(customQueueName),
          },
        ]
     };
   }
}
/// my.provider.ts
class MyProvider {
  constructor(
        @Inject('MY_CUSTOM_QUEUE_DI_TOKEN') private readonly queue: Queue,
  ){}
}

Declaring a parametrized queue Processor

Unfortunately I cannot find a similar approach that would let me parametrize a @Processor(). I cannot decorate the class with a queue name before DI injects all the necessary things, and that is probably already too late.

Describe the solution you'd like

The "parametrized queues" feature should consist of 2 parts:

  • ability to inject a parametrized queue into a class
  • ability to make a class a Processor of a parametrized queue

The first point is possible even today with an injection trick, but the second point is not possible due to the nature of class decorators.

Maybe it would be possible to make a class a Processor without the need of decorating the class in the first place? Or make the queue name be passed in a different way?

Teachability, documentation, adoption, migration strategy

No response

What is the motivation / use case for changing the behavior?

I need to be able to parametrize the queue name in my BullMQ-powered module so that several apps that use the same Redis instance and not read from/write to the same queue.

@akwodkiewicz
Copy link
Author

akwodkiewicz commented Apr 26, 2024

I was able to do the part 2 as well by decorating the class with proper metadata myself in its constructor (thx @quezak for suggesting that).

But since the metadata keys are not exported, I had to write the string myself and it's a risk it will stop working with next updates, since it's not a part of public API

class MyModule {
  static forRoot(customQueueName: string) {
    return {
        ...
        provide: [
          {
            provide: 'MY_CUSTOM_QUEUE_DI_TOKEN',
            useExisting: getQueueToken(customQueueName),
          },
          {
            provide: 'MY_CUSTOM_QUEUE_NAME_DI_TOKEN',
            useValue: customQueueName,
          }
        ]
     };
   }
}

class MyProvider extends WorkerHost {
   constructor(
        @Inject('MY_CUSTOM_QUEUE_DI_TOKEN')      queue:     Queue,
        @Inject('MY_CUSTOM_QUEUE_NAME_DI_TOKEN') queueName: string,
    ) {
        super();
        // solution to problem no. 2 below:
        SetMetadata(SCOPE_OPTIONS_METADATA, { name: queueName })(MyProvider);
        SetMetadata('bullmq:processor_metadata', { name: queueName })(MyProvider);
        //           ^
        //           this is `PROCESSOR_METADATA` in the lib
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant