Skip to content

Commit

Permalink
feat: replace existing Realtime API with new Realtime API
Browse files Browse the repository at this point in the history
  • Loading branch information
w3b6x9 committed Jun 17, 2022
1 parent f37643c commit 04f66b4
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 146 deletions.
87 changes: 29 additions & 58 deletions src/SupabaseClient.ts
@@ -1,18 +1,13 @@
import { FunctionsClient } from '@supabase/functions-js'
import { AuthChangeEvent } from '@supabase/gotrue-js'
import { PostgrestClient } from '@supabase/postgrest-js'
import {
RealtimeChannel,
RealtimeClient,
RealtimeClientOptions,
RealtimeSubscription,
} from '@supabase/realtime-js'
import { PostgrestClient, PostgrestQueryBuilder } from '@supabase/postgrest-js'
import { RealtimeChannel, RealtimeClient, RealtimeClientOptions } from '@supabase/realtime-js'
import { SupabaseStorageClient } from '@supabase/storage-js'
import { DEFAULT_HEADERS, STORAGE_KEY } from './lib/constants'
import { fetchWithAuth } from './lib/fetch'
import { isBrowser, stripTrailingSlash } from './lib/helpers'
import { SupabaseAuthClient } from './lib/SupabaseAuthClient'
import { SupabaseQueryBuilder } from './lib/SupabaseQueryBuilder'
import { SupabaseRealtimeClient } from './lib/SupabaseRealtimeClient'
import { Fetch, SupabaseClientOptions } from './lib/types'

const DEFAULT_OPTIONS = {
Expand Down Expand Up @@ -129,13 +124,11 @@ export default class SupabaseClient {
*
* @param table The table name to operate on.
*/
from<T = any>(table: string): SupabaseQueryBuilder<T> {
from<T = any>(table: string): PostgrestQueryBuilder<T> {
const url = `${this.restUrl}/${table}`
return new SupabaseQueryBuilder<T>(url, {
return new PostgrestQueryBuilder<T>(url, {
headers: this.headers,
schema: this.schema,
realtime: this.realtime,
table,
fetch: this.fetch,
shouldThrowOnError: this.shouldThrowOnError,
})
Expand Down Expand Up @@ -164,32 +157,31 @@ export default class SupabaseClient {

/**
* Creates a channel with Broadcast and Presence.
* Activated when vsndate query param is present in the WebSocket URL.
*/
channel(name: string, opts: { selfBroadcast: boolean; [key: string]: any }): RealtimeChannel {
const userToken = this.auth.session()?.access_token ?? this.supabaseKey
channel(name: string, opts?: { [key: string]: any }): SupabaseRealtimeClient {
const token = this.realtime.accessToken ?? this.supabaseKey

if (!this.realtime.isConnected()) {
this.realtime.connect()
}

return this.realtime.channel(name, { ...opts, user_token: userToken }) as RealtimeChannel
return new SupabaseRealtimeClient(this.realtime, name, token, opts)
}

/**
* Closes and removes all subscriptions and returns a list of removed
* subscriptions and their errors.
* Closes and removes all channels and returns a list of removed
* channels and their errors.
*/
async removeAllSubscriptions(): Promise<
{ data: { subscription: RealtimeSubscription }; error: Error | null }[]
async removeAllChannels(): Promise<
{ data: { channels: RealtimeChannel }; error: Error | null }[]
> {
const allSubs: RealtimeSubscription[] = this.getSubscriptions().slice()
const allSubPromises = allSubs.map((sub) => this.removeSubscription(sub))
const allRemovedSubs = await Promise.all(allSubPromises)
const allChans: RealtimeChannel[] = this.getChannels().slice()
const allChanPromises = allChans.map((chan) => this.removeChannel(chan))
const allRemovedChans = await Promise.all(allChanPromises)

return allRemovedSubs.map(({ error }, i) => {
return allRemovedChans.map(({ error }, i) => {
return {
data: { subscription: allSubs[i] },
data: { channels: allChans[i] },
error,
}
})
Expand All @@ -203,58 +195,37 @@ export default class SupabaseClient {
async removeChannel(
channel: RealtimeChannel
): Promise<{ data: { openChannels: number }; error: Error | null }> {
const { error } = await this._closeSubscription(channel)
const allChans: RealtimeSubscription[] = this.getSubscriptions()
const { error } = await this._closeChannel(channel)
const allChans: RealtimeChannel[] = this.getChannels()
const openChanCount = allChans.filter((chan) => chan.isJoined()).length

if (allChans.length === 0) await this.realtime.disconnect()

return { data: { openChannels: openChanCount }, error }
}

/**
* Closes and removes a subscription and returns the number of open subscriptions.
*
* @param subscription The subscription you want to close and remove.
*/
async removeSubscription(
subscription: RealtimeSubscription
): Promise<{ data: { openSubscriptions: number }; error: Error | null }> {
const { error } = await this._closeSubscription(subscription)
const allSubs: RealtimeSubscription[] = this.getSubscriptions()
const openSubCount = allSubs.filter((chan) => chan.isJoined()).length

if (allSubs.length === 0) await this.realtime.disconnect()

return { data: { openSubscriptions: openSubCount }, error }
}

private async _getAccessToken() {
const { session } = await this.auth.getSession()

return session?.access_token ?? null
}

private async _closeSubscription(
subscription: RealtimeSubscription | RealtimeChannel
): Promise<{ error: Error | null }> {
private async _closeChannel(channel: RealtimeChannel): Promise<{ error: Error | null }> {
let error = null

if (!subscription.isClosed()) {
const { error: unsubError } = await this._unsubscribeSubscription(subscription)
if (!channel.isClosed()) {
const { error: unsubError } = await this._unsubscribeChannel(channel)
error = unsubError
}

this.realtime.remove(subscription)
this.realtime.remove(channel)

return { error }
}

private _unsubscribeSubscription(
subscription: RealtimeSubscription | RealtimeChannel
): Promise<{ error: Error | null }> {
private _unsubscribeChannel(channel: RealtimeChannel): Promise<{ error: Error | null }> {
return new Promise((resolve) => {
subscription
channel
.unsubscribe()
.receive('ok', () => resolve({ error: null }))
.receive('error', (error: Error) => resolve({ error }))
Expand All @@ -263,10 +234,10 @@ export default class SupabaseClient {
}

/**
* Returns an array of all your subscriptions.
* Returns an array of all your channels.
*/
getSubscriptions(): RealtimeSubscription[] {
return this.realtime.channels as RealtimeSubscription[]
getChannels(): RealtimeChannel[] {
return this.realtime.channels as RealtimeChannel[]
}

private _initSupabaseAuthClient({
Expand Down Expand Up @@ -299,7 +270,7 @@ export default class SupabaseClient {
private _initRealtimeClient(options?: RealtimeClientOptions) {
return new RealtimeClient(this.realtimeUrl, {
...options,
params: { ...options?.params, apikey: this.supabaseKey },
params: { ...{ apikey: this.supabaseKey, vsndate: '2022' }, ...options?.params },
})
}

Expand Down
61 changes: 0 additions & 61 deletions src/lib/SupabaseQueryBuilder.ts

This file was deleted.

54 changes: 29 additions & 25 deletions src/lib/SupabaseRealtimeClient.ts
@@ -1,19 +1,17 @@
import { RealtimeSubscription, RealtimeClient, Transformers } from '@supabase/realtime-js'
import { GenericObject, SupabaseEventTypes, SupabaseRealtimePayload } from './types'
import { RealtimeChannel, RealtimeClient, Transformers } from '@supabase/realtime-js'
import { GenericObject, SupabaseRealtimePayload } from './types'

export class SupabaseRealtimeClient {
subscription: RealtimeSubscription
channel: RealtimeChannel

constructor(socket: RealtimeClient, headers: GenericObject, schema: string, tableName: string) {
const chanParams: GenericObject = {}
const topic = tableName === '*' ? `realtime:${schema}` : `realtime:${schema}:${tableName}`
const userToken = headers['Authorization'].split(' ')[1]
constructor(socket: RealtimeClient, name: string, token: string, opts?: { [key: string]: any }) {
let chanParams: GenericObject = { user_token: token }

if (userToken) {
chanParams['user_token'] = userToken
if (opts) {
chanParams = { ...chanParams, ...opts }
}

this.subscription = socket.channel(topic, chanParams) as RealtimeSubscription
this.channel = socket.channel(`realtime:${name}`, chanParams) as RealtimeChannel
}

private getPayloadRecords(payload: any) {
Expand All @@ -37,38 +35,44 @@ export class SupabaseRealtimeClient {
* The event you want to listen to.
*
* @param event The event
* @param filter An object that specifies what you want to listen to from the event.
* @param callback A callback function that is called whenever the event occurs.
*/
on(event: SupabaseEventTypes, callback: (payload: SupabaseRealtimePayload<any>) => void) {
this.subscription.on(event, (payload: any) => {
on(
event: string,
filter?: GenericObject,
callback?: (payload: SupabaseRealtimePayload<any>) => void
) {
this.channel.on(event, filter ?? {}, (payload: any) => {
const { schema, table, commit_timestamp, type, errors } = payload.payload
let enrichedPayload: SupabaseRealtimePayload<any> = {
schema: payload.schema,
table: payload.table,
commit_timestamp: payload.commit_timestamp,
eventType: payload.type,
schema: schema,
table: table,
commit_timestamp: commit_timestamp,
eventType: type,
new: {},
old: {},
errors: payload.errors,
errors: errors,
}

enrichedPayload = { ...enrichedPayload, ...this.getPayloadRecords(payload) }
enrichedPayload = { ...enrichedPayload, ...this.getPayloadRecords(payload.payload) }

callback(enrichedPayload)
callback && callback(enrichedPayload)
})
return this
}

/**
* Enables the subscription.
* Enables the channel.
*/
subscribe(callback: Function = () => {}) {
this.subscription.onError((e: Error) => callback('SUBSCRIPTION_ERROR', e))
this.subscription.onClose(() => callback('CLOSED'))
this.subscription
this.channel.onError((e: Error) => callback('CHANNEL_ERROR', e))
this.channel.onClose(() => callback('CLOSED'))
this.channel
.subscribe()
.receive('ok', () => callback('SUBSCRIBED'))
.receive('error', (e: Error) => callback('SUBSCRIPTION_ERROR', e))
.receive('error', (e: Error) => callback('CHANNEL_ERROR', e))
.receive('timeout', () => callback('RETRYING_AFTER_TIMEOUT'))
return this.subscription
return this.channel
}
}
2 changes: 0 additions & 2 deletions src/lib/types.ts
Expand Up @@ -71,5 +71,3 @@ export type SupabaseRealtimePayload<T> = {
old: T
errors: string[] | null
}

export type SupabaseEventTypes = 'INSERT' | 'UPDATE' | 'DELETE' | '*'

0 comments on commit 04f66b4

Please sign in to comment.