Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PollMatrix (cancelable polling by endpoint name and parameters) #10

Open
VldMrgnn opened this issue May 9, 2022 · 17 comments
Open

PollMatrix (cancelable polling by endpoint name and parameters) #10

VldMrgnn opened this issue May 9, 2022 · 17 comments

Comments

@VldMrgnn
Copy link
Contributor

VldMrgnn commented May 9, 2022

Hello, congrats and thanks for the great work.

I wonder if is possible to somehow pass the cancelType string for polling from the dispatch ?

like dispatch(loadSomeWork({userID:5},{poll:30*1000, cancelType: userId5 }) ?

Or from within the api definition, or some workaround to be able to set distinct polling requests (e.g. byUserId) and cancel individually by the parameters, on the same endpoint.

Thanks, Vlad

@neurosnap
Copy link
Member

Greetings! Thanks for trying the library out.

I wonder if is possible to somehow pass the cancelType string for polling from the dispatch ?

That's certainly possible, but I'd like to learn more about your use-case. It sounds like you need a poll for every individual user. I think this is an important scenario that we should think about how to solve across the board. Since saga-query endpoints are per resource we don't have a built-in way to handle when you need a loader or a poll per resource id.

In the mean time, you're free to build your own poll middleware and use that. Linked below is the source

https://github.com/neurosnap/saga-query/blob/main/src/middleware.ts#L200

export function poll(parentTimer?: number, cancelType?: string) {
  return function* poller(
    actionType: string,
    saga: any,
    ...args: any[]
  ): SagaIterator<void> {
    const cancel = cancelType || actionType;
    function* fire(action: { type: string }, timer: number) {
      while (true) {
        yield call(saga, action, ...args);
        yield delay(timer);
      }
    }

    while (true) {
      const action = yield take(`${actionType}`);
      const timer = action.payload?.timer || parentTimer;
      yield race([call(fire, action, timer), take(`${cancel}`)]);
    }
  };
}

I'd also be happy to accept any PRs that you do make (like adding cancelType to the action payload).

What you could do immediately is something like this:

while (true) {
  const action = yield take(`${actionType}`);
  const timer = action.payload?.timer || parentTimer;
  const cancelPoll = action.payload?.cancelType || cancel;
  yield race([call(fire, action, timer), take(`${cancelPoll}`)]);
}

If you have any questions or feedback I'd be happy to listen.

@VldMrgnn
Copy link
Contributor Author

VldMrgnn commented May 9, 2022

Thank you, I'll go further with your suggestion and I think that will do it,

Anyway I wrote down the use-case.
A better example would be about "items".
The use-case is as follows.

On client-side :

0 - the user selects some ( multiple ) ITEMS to be calculated (ITEMID), by clicking on a tree-view.
(for each ITEM we can have many underlying records of data - from zero to few hundreds)
1 - the software will display some initially pre-calculated values for each of the selected ITEM and the total
(for each ITEM selected there are different and complex rules to be applied on a dataset)
2 - for each ITEM selected, a request is made to the node.js server to fetch that underlying dataset.
(the dataset could change in-between polls. At this phase there is no locking)
the request has parameters: ITEMID , hash-sum (initially null) etc.

On the server (Node.js Express Redis) - on poll initial

4 -on request by ITEMID, some possibly heavy selections are made from redis tables having as result a dataset.
An hash-sum is made based on some fields of this dataset.
5 -the return contains the dataset and the hash-sum. ( ~ the subsidiaries of ITEMID and the CRC)

on client
6 - we send the dataset to be processed to sagas, and then to reducer. (other than the cache)
7 - we keep in cache a record: [ITEMID]:hash-sum

8 - as long as the ITEM is in process on client-side we poll the server to see if there are any changes in the underlying dataset.

on server on poll
9 - we compare the current and the previous hash-sum for the given ITEMID.
a. if is changed we return the dataset and new-hash-sum.
b. if is the same hash-sum then we return status(208) - empty

on client
10 - a. if receiving new data then replace hash-sum in cache and do further calculation in saga
b. if receiving 208 then exit

User could select multiple ITEMID or just one.
Combining them different algorithms applies.
The calculations could be complex.

I.E. we subscribe/unsubscribe to a polling by ITEMID request.
As long as there is no new data we are able to ignore the request.

Ideally we would like to pause the polling and resume.

@VldMrgnn
Copy link
Contributor Author

VldMrgnn commented May 10, 2022

It seems that the task isn't trivial at all. Sorry for the irrelevant details above. Looking forward to the ongoing PR.

@neurosnap
Copy link
Member

Is there anything I can help with?

@VldMrgnn
Copy link
Contributor Author

For now I implemented this feature using an other approach than saga-query in order to go on with the project (mixing valtio proxy, rtkq and sagas to make the feature fit in the project ).
Still I will continue on adopting saga-query as I consider it an elegant solution in compliance with the core concepts of our architecture.
I really hope that this implementation will be available and I think of a more general implementation of this concept. Even if the polling seems not such a big deal it could have a big impact on user experience and on developer's approach as well.
I think that the cache shouldn't stop its logic on client but should propagate it's validation on server.
Personally I embrace the same ideas you described in your blog regarding the logic in sagas and the redux as tables. I see selectors as queries and sagas as procedures.
In past two years of working with redux-saga I had to made up some basic tooling and techniques to solve fetching, caching, indexing and additional processing. So I wrote a wrapper around useSWR and some hooks, some workers and so on. It's pretty messy but is working. So I am happy to try to refactor and put things on track. I find extremely useful to have the middleware approach inside the saga-fetch. I still discover how things work but I can tell that the library is what I'd wished to have from when I started.

@neurosnap
Copy link
Member

Reading your use-case again, it seems like you want to create a poll per ITEMID and be able to cancel the poll per ITEMID, is that right?

@VldMrgnn
Copy link
Contributor Author

Yes. That is correct.

@VldMrgnn
Copy link
Contributor Author

I think that problem is that the "name" is untouchable. Initially I thought that cancelling by parameter is enough, but then the next call overlaps and invalidates the first call and so on.
Changing the action name dynamically should be quite an issue I suppose and it should affect the way the endpoint is declared.

@neurosnap
Copy link
Member

neurosnap commented May 10, 2022

So I haven't actually tested this code out, but theoretically something like this should work.

export function pollMatrix(cancelType: string, parentTimer?: number) {
  return function* poller(
    actionType: string,
    saga: any,
    ...args: any[]
  ): SagaIterator<void> {
    const pollMap: { [key: string]: Task } = {};

    function* fire(action: { type: string }, timer: number) {
      while (true) {
        yield call(saga, action, ...args);
        yield delay(timer);
      }
    }

    function* watcher(): SagaIterator<void> {
      while (true) {
        const action = yield take(`${actionType}`);
        const timer = action.payload?.timer || parentTimer;
        const task: Task = yield fork(fire, action, timer);
        pollMap[action.payload.key] = task;
      }
    }

    function* canceler(): SagaIterator<void> {
      while (true) {
        const action = yield take(`${cancelType}`);
        const id = action.payload.key
        const task = pollMap[id];
        if (!task) continue;
        yield cancel(task);
        delete pollMap[id];
      }
    }

    yield all([call(canceler), call(watcher)]);
  };
}

const cancelFetchData = 
  (payload: { payload: { key: string } }) => ({ type: 'cancel-fetch-data', payload });
const api = createApi();
const pollFetchData = api.get<{ id: string }>(
  '/data/:id', 
  { saga: pollMatrix(`${cancelFetchData}`) }, 
  api.cache()
)
const Row = ({ itemId }: { itemId: string }) => {
  const action = pollFetchData({ id: itemId });
  useEffect(() => {
    dispatch(action);
  }, [itemId]);
  const cancel = () => {
    dispatch(cancelFetchData(action))
  }

  return <button onClick={cancel}>cancel: {itemId}</div>
}
const Table = () => {
  const items = ['1', '2', '3'];
  return <div>items.forEach((itemId) => <Row key={itemId} itemId={itemId} />)</div>
}

There's probably a better approach but the perks of this setup is everything is still contained within an endpoint saga. Under the hood when we create an api endpoint with api.get(...) we serialize the action payload that you pass in when dispatching the endpoint. This should create a unique signature that you can use for the id. You don't have to go this route, you could supply the id yourself by modifying the poller function slightly, but I thought this worked just as well.

@VldMrgnn
Copy link
Contributor Author

VldMrgnn commented May 11, 2022

Hello and thanks. It works perfectly.
I implemented it like:

interface IWatchParams {
	PARAM1_ID: number;  
	PARAM2_ID: number;
} // for example of multiple params. as it doesn't matter how we define our  "ITEM"

const unwatchSome = ({payload:{key}}) => ({ type: 'cancel-fetch-data', payload: {key: key} });

const watchSome = api.post<IWatchParams>(
	`endpoint/path`,
	{ saga: pollMatrix('cancel-fetch-data', 20 * 1000) },
	function* onLoadSaga(ctx: ApiCtx<IReportResponse>, next) {
		/// prepare input data ...
		ctx.request = ctx.req({
			body: JSON.stringify({anyParams: 'anyParams needed in backend'})
		});
		
		yield next();
		
		// manage result...;
	}
);

export function* registerWatch  (params) {
	const theAction = yield watchSome(params);
	yield put(theAction)
	return {
		unsubscribe: unwatchSome(theAction)
	}
}

I needed to call this from another saga, so it went like this:

const localMap = {}; // some Object,  or Map or Proxy to keep track of registered actions

function* subscribe(id1, id2) {
	const {unsubscribe} = yield call(registerWatch, { PARAM1_ID: id1, PARAM2_ID: id2 }); 
	localMap[`${id1}:${id1}`] = yield unsubscribe;
	return;
}
// and later 

function* unsubscribe(id1, id2) { 
	if (local.workmap[`${id1}:${id1}`] === undefined) return; 
	yield put(local.workmap[`${id1}:${id1}`]);
	yield delete local.workmap[`${id1}:${id1}`];
}

With that I can go further.
Still It could be done better. Something like this:
inside the pollMatrix , the pollMap key to point to a combination (JSON.stringify) of endpoint path and params

like pollMap [{"endpointpathstring":{"PARAM1":1,"PARAM2":2}}'] = task;
then we shouldn't need to keep track of the unsubsribe action from outside the saga-query;
I'll try this nex.

@VldMrgnn
Copy link
Contributor Author

VldMrgnn commented May 11, 2022

It would be easier like this:

import { SagaIterator, Task } from 'redux-saga';
import { all, delay, call, take, cancel, fork } from 'redux-saga/effects';

export function pollMatrix(cancelType: string, parentTimer?: number) {

	return function* poller(
		actionType: string,
		saga: any,
		...args: any[]
	): SagaIterator<void> {
		const pollMap: { [key: string]: Task } = {};
		
		function* fire(action: { type: string }, timer: number) {
			while (true) {
				yield call(saga, action, ...args);
				yield delay(timer);
			}
		}

		function* watcher(): SagaIterator<void> {
			while (true) {
				const action = yield take(`${actionType}`);
                                // so we build a key which can be rebuilt later based on action and params // 
				const watcherKeyStr = JSON.stringify({name : action.payload.name , ... action.payload.options});
				const timer = action.payload?.timer || parentTimer;
				const task: Task = yield fork(fire, action, timer);
				pollMap[watcherKeyStr] = task;
			}
		}

		function* canceler(): SagaIterator<void> {
			while (true) {
				const action = yield take(`${cancelType}`);
				const watcherKeyStr = action.payload;
				const task = pollMap[watcherKeyStr];
				if (!task) continue;
				yield cancel(task);
				delete pollMap[watcherKeyStr];
			}
		}
		yield all([call(canceler), call(watcher)]);
	};
}

then from api:

export const watchSome = api.post<IWatchParams>(
	`endpoint/path`,
	{ saga: pollMatrix('cancel-fetch-data', 20 * 1000) },
	function* onLoadSaga(ctx: ApiCtx<IReportResponse>, next) {
		/// prepare input data ...
		ctx.request = ctx.req({
			body: JSON.stringify({anyParams: 'anyParams needed in backend'})
		});
		
		yield next();
		
		// manage result...;
	}
);

export function* cancelWatch (params) {
        // Here we rebuild the key... 
	// we need the "name"
        // is there a function to get the name out without invoking the api.post ?
	const {payload:{name}} = yield watchSome(params); 
	// or just write-it manually appending [POST] (IS IT SAFE ?)
        //  anyway it will cancel the poll...
	yield put({ type: 'cancel-fetch-data', payload: JSON.stringify({name: name, ...params})});
} 

then from the front (in my case another saga)

//register
	yield put(watchSome ({ GROUPID:  GROUPID, ITEMID:  id })); // or whatever params we use 
//is important that the params are well typed to keep the key consistency ( e.g "ID":1 vs "ID": "1")  when stringifying.

// unregister 
        yield call(cancelWatch, { GROUPID:  GROUPID, ITEMID: id}); 

@VldMrgnn
Copy link
Contributor Author

Perhaps it would be more elegant just to reuse the "key" parameter from api based on actionType and options and keep it in the pollMap. But not knowing the mechanics or that key I didn't went on that direction.

@neurosnap
Copy link
Member

https://github.com/neurosnap/saga-query/blob/main/src/pipe.ts#L146

We serialize the name of the action as well as the options you send to the action as a json encoded, base64 string. We use this key for automatic caching and the like.

@VldMrgnn
Copy link
Contributor Author

Thanks a lot for this. I feel stupid for trying to reinvent the wheel. It totally make sense the first approach of yours. I just missed it.
In this case I'll stick with the following implementation:

import { SagaIterator, Task } from 'redux-saga';
import { all, delay, call, take, cancel, fork } from 'redux-saga/effects';

export function pollMatrix(cancelType: string, parentTimer?: number) {

	return function* poller(
		actionType: string,
		saga: any,
		...args: any[]
	): SagaIterator<void> {
		const pollMap: { [key: string]: Task } = {};
		
		function* fire(action: { type: string }, timer: number) {
			while (true) {
				yield call(saga, action, ...args);
				yield delay(timer);
			}
		}

		function* watcher(): SagaIterator<void> {
			while (true) {
				const action = yield take(`${actionType}`);
				const timer = action.payload?.timer || parentTimer;
				const task: Task = yield fork(fire, action, timer);
				pollMap[action.payload.key]=task; 
			}
		}

		function* canceler(): SagaIterator<void> {
			while (true) {
				const action = yield take(`${cancelType}`);
				const watcherKeyStr = action.payload;
				const task = pollMap[watcherKeyStr];
				if (!task) continue;
				yield cancel(task);
				delete pollMap[watcherKeyStr];
			}
		}
		yield all([call(canceler), call(watcher)]);
	};
}

the api

const watchWorkMap = api.post<IWorkMapParams>(
	`reportshead/wmap`,
	{ saga: pollMatrix('cancel-fetch-data', 20 * 1000) },
	function* onLoadWorkMap(ctx: ApiCtx<IReportResponse>, next) {
//...
});


export function* workMapPolling( exec: 'subscribe'|'unsubscribe', params: IWorkMapParams) {
	const theAction = yield watchWorkMap(params);
	if (exec === 'subscribe') {
		yield put(theAction);
	}
	if (exec === 'unsubscribe') {
		const {payload:{key}} = yield theAction;
		yield put({ type: 'cancel-fetch-data', payload:key});
	}
	return ;
}

the consumer:

import { workMapPolling } from '../apis';
//..
yield call(workMapPolling, 'subscribe',{/*params */});
//OR
yield call(workMapPolling,'unsubscribe', {/*params */});

Perhaps the pollMatrix should land into the package as a standard option.

Thanks again for your support!
Vlad

@VldMrgnn
Copy link
Contributor Author

VldMrgnn commented May 12, 2022

I refactored a little and put some sugar. Please take a look at this forked sandbox

I have concern though. Can we be sure that the key will be the same if the user set the parameters in different order?
Let's say : polling( subscribe, {GROUP:1, ITEM:2}) / polling(unsubscribe {ITEM:2, GROUP:1});

As I tested it works ok as the order of the keys is somehow managed by the browser.
But what if there are nested objects passed as parameters or other edge situations?
I think that some procedure to enforce the key identity on same combination of Name + Options should be more assuring.
On the other hand, this is not a very important situation, as the developer could and should be careful on the parameter definition.

@VldMrgnn VldMrgnn changed the title Custom CancelKey for polling PollMatrix ( cancelable polling by endpoint name and parameters) May 12, 2022
@neurosnap neurosnap changed the title PollMatrix ( cancelable polling by endpoint name and parameters) PollMatrix (cancelable polling by endpoint name and parameters) May 13, 2022
@neurosnap
Copy link
Member

neurosnap commented May 13, 2022

I think you are most certainly right. We should sort the order of the keys to ensure it's always the same order. I'd be delighted to accept a PR for that change. If not, I'll get to it sometime next week.

@VldMrgnn
Copy link
Contributor Author

I'll be happy to contribute. I'll get on with this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants