Skip to content

Latest commit

History

History
146 lines (119 loc) 路 5.36 KB

README.md

File metadata and controls

146 lines (119 loc) 路 5.36 KB

AMQ RPC

semantic-release Conventional Changelog FlowJS Build Status Coverage Status

Greenkeeper badge dependencies Status devDependencies Status

npm node MIT License

NPM

Attention, module currently in active development 鈿狅笍
Soon to be released, maybe around 15 october 2018 馃枛

Samples

Client:

import { RpcClient } from 'amq-rpc';

(async () => {
  const client = new RpcClient({
    service: 'my-awesome-service',
    version: '1.2',
    connectParams: {
      url: 'amqp://guest:guest@localhost:5672/?vhost=/',
      heartbeat: 30,
    },
    waitResponseTimeout: 30 * 1000, // timeout for wait result from service
  });

  await client.ensureConnection(); // accept in first param object as connectParams in constructor

  const result = await client.send({ foo: 'bar' }, { correlationId: 'e.g. nginx req id' });
  const result2 = await client.call('myAction', { foo: 'bar' }, { correlationId: 'e.g. nginx req id' });

  await client.destroy();
})().catch(err => console.error(err) || process.exit(1));

Service:

import { RpcService, RpcServiceHandler } from 'amq-rpc';

(async () => {
  const service = new RpcService({
    service: 'my-awesome-service',
    version: '1.2',
    connectParams: {
      url: 'amqp://guest:guest@localhost:5672/?vhost=/',
      heartbeat: 30,
    },
    queue: {
      prefetch: 1,
      durable: true,
      maxPriority: 100,
    },
  });
  
  service.setErrorHandler((error) => {
    // All errors, which can't passed to reject operation (as error in subscriber function,
    // outside of user handler), will be passed to this callback.
  });
  
  await service.addHandler(class extends RpcServiceHandler {
    // If in message "type" property didn't fill (send without special options),
    // service will find handler with action 'default' 
    get action() {
      // in base class, RpcServiceHandler, action equal to 'default'
      return 'myAction2';
    }
  
    async beforeHandle() {
      // called nearly before handle method
      // use it for prepare data, init resources or logging
      // all throwed errors, as in handle method passed to handleFail method
    }

    // 鈿狅笍 you must redefine this method from RpcServiceHandler class
    async handle() {
      // this.payload - sended payload
      // this.context - special object, shared between methods. By default equal to {}.
      // returned data passed to client as reply payload
      return { bar: 'foo' };
    }

    // 鈿狅笍 redefine this method only if you know what you do
    async handleFail(error: Error) {
      /*
        In base class, RpcServiceHandler:
         - reject message in queue
         - reply to client error with messageId and correlationId
       */
      // you can redefine and customize error handling behavior 
    }

    // 鈿狅笍 redefine this method only if you know what you do
    async handleSuccess(replyPayload: Object) {
      /*
        In base class, RpcServiceHandler:
         - ack message in queue
         - reply to client with payload and error: null
       */
      // you can redefine and customize success handling behavior 
    }

    async onFail(error: Error) {
      // hook for logging
    }

    async onSuccess(replyPayload: Object) {
      // hook for logging
    }

    async afterHandle(error: ?Error, replyPayload: ?Object) {
      // if current handler failed, error passed in first argument
      // if success handling, replyPayload passed as second argument
      // use it for logging or deinit resouces
      // wrap this code in try..catch block, because all errors from afterHandle method just 
      // pass to error handler callback
    }
  });

  // Minimal handler
  await service.addHandler(class extends RpcServiceHandler {
    async handle() {
      return { bar: `${this.payload.foo} 42` };
    }
  });
  
  await service.ensureConnection();
  
  // If process receive SIGINT, service will be gracefully stopped
  // (wait for handler end work until timeout exceeded and then call for process.exit())
  await service.interventSignalInterceptors({ stopSignal: 'SIGINT', gracefulStopTimeout: 10 * 1000 });
})().catch(err => console.error(err) || process.exit(1));