|
1 | 1 | import {type SanityClient} from '@sanity/client'
|
2 | 2 | import {type Mutation} from '@sanity/mutator'
|
3 | 3 | import {type SanityDocument} from '@sanity/types'
|
4 |
| -import {EMPTY, from, merge, type Observable} from 'rxjs' |
| 4 | +import {EMPTY, from, merge, type Observable, Subject} from 'rxjs' |
5 | 5 | import {filter, map, mergeMap, mergeMapTo, share, tap} from 'rxjs/operators'
|
6 | 6 |
|
7 | 7 | import {
|
@@ -63,6 +63,7 @@ export interface Pair {
|
63 | 63 | transactionsPendingEvents$: Observable<PendingMutationsEvent>
|
64 | 64 | published: DocumentVersion
|
65 | 65 | draft: DocumentVersion
|
| 66 | + complete: () => void |
66 | 67 | }
|
67 | 68 |
|
68 | 69 | function setVersion<T>(version: 'draft' | 'published') {
|
@@ -105,7 +106,10 @@ function submitCommitRequest(client: SanityClient, request: CommitRequest) {
|
105 | 106 | export function checkoutPair(client: SanityClient, idPair: IdPair): Pair {
|
106 | 107 | const {publishedId, draftId} = idPair
|
107 | 108 |
|
108 |
| - const listenerEvents$ = getPairListener(client, idPair).pipe(share()) |
| 109 | + const listenerEventsConnector = new Subject<ListenerEvent>() |
| 110 | + const listenerEvents$ = getPairListener(client, idPair).pipe( |
| 111 | + share({connector: () => listenerEventsConnector}), |
| 112 | + ) |
109 | 113 |
|
110 | 114 | const reconnect$ = listenerEvents$.pipe(
|
111 | 115 | filter((ev) => ev.type === 'reconnect'),
|
@@ -146,5 +150,6 @@ export function checkoutPair(client: SanityClient, idPair: IdPair): Pair {
|
146 | 150 | consistency$: published.consistency$,
|
147 | 151 | remoteSnapshot$: published.remoteSnapshot$.pipe(map(setVersion('published'))),
|
148 | 152 | },
|
| 153 | + complete: () => listenerEventsConnector.complete(), |
149 | 154 | }
|
150 | 155 | }
|
0 commit comments