Skip to content

Commit

Permalink
Merge pull request #344 from supabase/fix/promise-allsettled-refactor
Browse files Browse the repository at this point in the history
fix: refactor removeAllSubscriptions to use Promise.all
  • Loading branch information
w3b6x9 committed Jan 20, 2022
2 parents 5a92fd8 + 7c166e8 commit f9ef9f3
Showing 1 changed file with 58 additions and 23 deletions.
81 changes: 58 additions & 23 deletions src/SupabaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,43 +133,79 @@ export default class SupabaseClient {
/**
* Remove all subscriptions.
*/
async removeAllSubscriptions() {
const subscriptions = this.realtime.channels.slice()
return await Promise.allSettled(subscriptions.map((sub) => this.removeSubscription(sub)))
async removeAllSubscriptions(): Promise<
(
| {
status: 'fulfilled'
value: {
error: null
}
}
| { status: 'rejected'; reason: { error: Error } }
)[]
> {
const subs: RealtimeSubscription[] = this.realtime.channels.slice()
const removeSubPromises = subs.map((sub: RealtimeSubscription) =>
this.removeSubscription(sub)
.then((): { status: 'fulfilled'; value: { error: null } } => ({
status: 'fulfilled',
value: { error: null },
}))
.catch((reason: { error: Error }): { status: 'rejected'; reason: { error: Error } } => ({
status: 'rejected',
reason,
}))
)
return Promise.all(removeSubPromises)
}

/**
* Removes an active subscription and returns the number of open connections.
*
* @param subscription The subscription you want to remove.
*/
removeSubscription(subscription: RealtimeSubscription) {
return new Promise(async (resolve) => {
try {
await this._closeSubscription(subscription)
removeSubscription(subscription: RealtimeSubscription): Promise<
| {
data: { openSubscriptions: number }
error: null
}
| { error: Error }
> {
return new Promise(async (resolve, reject) => {
const { error } = await this._closeSubscription(subscription)

if (error) {
return reject({ error })
}

const allSubscriptions = this.getSubscriptions()
const openSubscriptionsCount = allSubscriptions.filter((chan) => chan.isJoined()).length

const allSubscriptions = this.getSubscriptions()
const openSubscriptionsCount = allSubscriptions.filter((chan) => chan.isJoined()).length
if (allSubscriptions.length === 0) {
const { error } = await this.realtime.disconnect()

if (!allSubscriptions.length) {
const { error } = await this.realtime.disconnect()
if (error) return resolve({ error })
if (error) {
return reject({ error })
}
return resolve({ error: null, data: { openSubscriptions: openSubscriptionsCount } })
} catch (error) {
return resolve({ error })
}

return resolve({
data: { openSubscriptions: openSubscriptionsCount },
error: null,
})
})
}

private async _closeSubscription(subscription: RealtimeSubscription) {
private async _closeSubscription(
subscription: RealtimeSubscription
): Promise<{ error: null | Error }> {
if (!subscription.isClosed()) {
await this._closeChannel(subscription)
return await this._closeChannel(subscription)
}

return new Promise((resolve) => {
this.realtime.remove(subscription)
return resolve(true)
return resolve({ error: null })
})
}

Expand Down Expand Up @@ -226,14 +262,13 @@ export default class SupabaseClient {
return headers
}

private _closeChannel(subscription: RealtimeSubscription) {
private _closeChannel(subscription: RealtimeSubscription): Promise<{ error: null | Error }> {
return new Promise((resolve, reject) => {
subscription
.unsubscribe()
.receive('ok', () => {
return resolve(true)
})
.receive('error', (e: Error) => reject(e))
.receive('ok', () => resolve({ error: null }))
.receive('error', (error: Error) => reject({ error }))
.receive('timeout', () => reject({ error: 'timed out' }))
})
}

Expand Down

0 comments on commit f9ef9f3

Please sign in to comment.