Skip to content

Commit 5065573

Browse files
committed
[3/4] Integrate realtime ppl into SDK cache management and tests
1 parent 0c570ab commit 5065573

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+4598
-2350
lines changed

packages/firestore/src/api/pipeline_impl.ts

+133-2
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,41 @@
1515
* limitations under the License.
1616
*/
1717

18-
import { Pipeline } from '../api/pipeline';
19-
import { firestoreClientExecutePipeline } from '../core/firestore_client';
18+
// Re-adding necessary imports that were removed previously
19+
import {
20+
CompleteFn,
21+
ErrorFn,
22+
isPartialObserver,
23+
NextFn,
24+
PartialObserver
25+
} from '../api/observer';
26+
import {
27+
firestoreClientExecutePipeline,
28+
firestoreClientListen
29+
} from '../core/firestore_client';
30+
import { ListenerDataSource } from '../core/event_manager';
31+
import { toCorePipeline } from '../core/pipeline-util';
32+
import { ViewSnapshot } from '../core/view_snapshot';
2033
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
2134
import { PipelineResult, PipelineSnapshot } from '../lite-api/pipeline-result';
2235
import { PipelineSource } from '../lite-api/pipeline-source';
2336
import { Stage } from '../lite-api/stage';
2437
import { newUserDataReader } from '../lite-api/user_data_reader';
38+
import { FirestoreError } from '../util/error';
2539
import { cast } from '../util/input_validation';
2640

2741
import { ensureFirestoreConfigured, Firestore } from './database';
42+
import { Pipeline } from './pipeline'; // Keep this specific Pipeline import if needed alongside LitePipeline
43+
import { RealtimePipeline } from './realtime_pipeline';
2844
import { DocumentReference } from './reference';
45+
import { SnapshotListenOptions, Unsubscribe } from './reference_impl';
46+
import { RealtimePipelineSnapshot } from './snapshot';
2947
import { ExpUserDataWriter } from './user_data_writer';
3048

3149
declare module './database' {
3250
interface Firestore {
3351
pipeline(): PipelineSource<Pipeline>;
52+
realtimePipeline(): PipelineSource<RealtimePipeline>;
3453
}
3554
}
3655

@@ -71,6 +90,7 @@ declare module './database' {
7190
export function execute(pipeline: LitePipeline): Promise<PipelineSnapshot> {
7291
const firestore = cast(pipeline._db, Firestore);
7392
const client = ensureFirestoreConfigured(firestore);
93+
7494
return firestoreClientExecutePipeline(client, pipeline).then(result => {
7595
// Get the execution time from the first result.
7696
// firestoreClientExecutePipeline returns at least one PipelineStreamElement
@@ -90,6 +110,7 @@ export function execute(pipeline: LitePipeline): Promise<PipelineSnapshot> {
90110
? new DocumentReference(firestore, null, element.key)
91111
: undefined,
92112
element.fields,
113+
element.executionTime?.toTimestamp(),
93114
element.createTime?.toTimestamp(),
94115
element.updateTime?.toTimestamp()
95116
)
@@ -110,3 +131,113 @@ Firestore.prototype.pipeline = function (): PipelineSource<Pipeline> {
110131
);
111132
});
112133
};
134+
135+
Firestore.prototype.realtimePipeline =
136+
function (): PipelineSource<RealtimePipeline> {
137+
return new PipelineSource<RealtimePipeline>(
138+
this._databaseId,
139+
(stages: Stage[]) => {
140+
return new RealtimePipeline(
141+
this,
142+
newUserDataReader(this),
143+
new ExpUserDataWriter(this),
144+
stages
145+
);
146+
}
147+
);
148+
};
149+
150+
/**
151+
* @internal
152+
* @private
153+
*/
154+
export function _onRealtimePipelineSnapshot(
155+
pipeline: RealtimePipeline,
156+
observer: {
157+
next?: (snapshot: RealtimePipelineSnapshot) => void;
158+
error?: (error: FirestoreError) => void;
159+
complete?: () => void;
160+
}
161+
): Unsubscribe;
162+
/**
163+
* @internal
164+
* @private
165+
*/
166+
export function _onRealtimePipelineSnapshot(
167+
pipeline: RealtimePipeline,
168+
options: SnapshotListenOptions,
169+
observer: {
170+
next?: (snapshot: RealtimePipelineSnapshot) => void;
171+
error?: (error: FirestoreError) => void;
172+
complete?: () => void;
173+
}
174+
): Unsubscribe;
175+
/**
176+
* @internal
177+
* @private
178+
*/
179+
export function _onRealtimePipelineSnapshot(
180+
pipeline: RealtimePipeline,
181+
onNext: (snapshot: RealtimePipelineSnapshot) => void,
182+
onError?: (error: FirestoreError) => void,
183+
onComplete?: () => void
184+
): Unsubscribe;
185+
/**
186+
* @internal
187+
* @private
188+
*/
189+
export function _onRealtimePipelineSnapshot(
190+
pipeline: RealtimePipeline,
191+
options: SnapshotListenOptions,
192+
onNext: (snapshot: RealtimePipelineSnapshot) => void,
193+
onError?: (error: FirestoreError) => void,
194+
onComplete?: () => void
195+
): Unsubscribe;
196+
export function _onRealtimePipelineSnapshot(
197+
pipeline: RealtimePipeline,
198+
...args: unknown[]
199+
): Unsubscribe {
200+
let options: SnapshotListenOptions = {
201+
includeMetadataChanges: false,
202+
source: 'default'
203+
};
204+
let currArg = 0;
205+
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
206+
options = args[currArg] as SnapshotListenOptions;
207+
currArg++;
208+
}
209+
210+
const internalOptions = {
211+
includeMetadataChanges: options.includeMetadataChanges,
212+
source: options.source as ListenerDataSource
213+
};
214+
215+
let userObserver: PartialObserver<RealtimePipelineSnapshot>;
216+
if (isPartialObserver(args[currArg])) {
217+
userObserver = args[currArg] as PartialObserver<RealtimePipelineSnapshot>;
218+
} else {
219+
userObserver = {
220+
next: args[currArg] as NextFn<RealtimePipelineSnapshot>,
221+
error: args[currArg + 1] as ErrorFn,
222+
complete: args[currArg + 2] as CompleteFn
223+
};
224+
}
225+
226+
const client = ensureFirestoreConfigured(pipeline._db as Firestore);
227+
const observer = {
228+
next: (snapshot: ViewSnapshot) => {
229+
if (userObserver.next) {
230+
userObserver.next(new RealtimePipelineSnapshot(pipeline, snapshot));
231+
}
232+
},
233+
error: userObserver.error,
234+
complete: userObserver.complete
235+
};
236+
237+
return firestoreClientListen(
238+
client,
239+
toCorePipeline(pipeline),
240+
internalOptions, // Pass parsed options here
241+
observer
242+
);
243+
}

packages/firestore/src/api/reference_impl.ts

+116-4
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ import {
3434
firestoreClientListen,
3535
firestoreClientWrite
3636
} from '../core/firestore_client';
37-
import { newQueryForPath, Query as InternalQuery } from '../core/query';
37+
import { QueryOrPipeline, toCorePipeline } from '../core/pipeline-util';
38+
import { newQueryForPath } from '../core/query';
3839
import { ViewSnapshot } from '../core/view_snapshot';
3940
import { FieldPath } from '../lite-api/field_path';
4041
import { validateHasExplicitOrderByForLimitToLast } from '../lite-api/query';
@@ -63,7 +64,13 @@ import { FirestoreError } from '../util/error';
6364
import { cast } from '../util/input_validation';
6465

6566
import { ensureFirestoreConfigured, Firestore } from './database';
66-
import { DocumentSnapshot, QuerySnapshot, SnapshotMetadata } from './snapshot';
67+
import { RealtimePipeline } from './realtime_pipeline';
68+
import {
69+
DocumentSnapshot,
70+
QuerySnapshot,
71+
RealtimePipelineSnapshot,
72+
SnapshotMetadata
73+
} from './snapshot';
6774
import { ExpUserDataWriter } from './user_data_writer';
6875

6976
/**
@@ -190,6 +197,10 @@ export function getDocFromServer<
190197
*
191198
* @returns A `Promise` that will be resolved with the results of the query.
192199
*/
200+
export function getDocs<AppModelType, DbModelType extends DocumentData>(
201+
query: Query<AppModelType, DbModelType>
202+
): Promise<QuerySnapshot<AppModelType, DbModelType>>;
203+
193204
export function getDocs<AppModelType, DbModelType extends DocumentData>(
194205
query: Query<AppModelType, DbModelType>
195206
): Promise<QuerySnapshot<AppModelType, DbModelType>> {
@@ -207,7 +218,7 @@ export function getDocs<AppModelType, DbModelType extends DocumentData>(
207218
new QuerySnapshot<AppModelType, DbModelType>(
208219
firestore,
209220
userDataWriter,
210-
query,
221+
query as Query<AppModelType, DbModelType>,
211222
snapshot
212223
)
213224
);
@@ -657,6 +668,7 @@ export function onSnapshot<AppModelType, DbModelType extends DocumentData>(
657668
onError?: (error: FirestoreError) => void,
658669
onCompletion?: () => void
659670
): Unsubscribe;
671+
660672
export function onSnapshot<AppModelType, DbModelType extends DocumentData>(
661673
reference:
662674
| Query<AppModelType, DbModelType>
@@ -691,7 +703,7 @@ export function onSnapshot<AppModelType, DbModelType extends DocumentData>(
691703

692704
let observer: PartialObserver<ViewSnapshot>;
693705
let firestore: Firestore;
694-
let internalQuery: InternalQuery;
706+
let internalQuery: QueryOrPipeline;
695707

696708
if (reference instanceof DocumentReference) {
697709
firestore = cast(reference.firestore, Firestore);
@@ -744,6 +756,106 @@ export function onSnapshot<AppModelType, DbModelType extends DocumentData>(
744756
);
745757
}
746758

759+
export function onPipelineSnapshot<
760+
AppModelType,
761+
DbModelType extends DocumentData
762+
>(
763+
query: RealtimePipeline,
764+
observer: {
765+
next?: (snapshot: RealtimePipelineSnapshot) => void;
766+
error?: (error: FirestoreError) => void;
767+
complete?: () => void;
768+
}
769+
): Unsubscribe;
770+
export function onPipelineSnapshot<
771+
AppModelType,
772+
DbModelType extends DocumentData
773+
>(
774+
query: RealtimePipeline,
775+
options: SnapshotListenOptions,
776+
observer: {
777+
next?: (snapshot: RealtimePipelineSnapshot) => void;
778+
error?: (error: FirestoreError) => void;
779+
complete?: () => void;
780+
}
781+
): Unsubscribe;
782+
export function onPipelineSnapshot<
783+
AppModelType,
784+
DbModelType extends DocumentData
785+
>(
786+
query: RealtimePipeline,
787+
onNext: (snapshot: RealtimePipelineSnapshot) => void,
788+
onError?: (error: FirestoreError) => void,
789+
onCompletion?: () => void
790+
): Unsubscribe;
791+
export function onPipelineSnapshot<
792+
AppModelType,
793+
DbModelType extends DocumentData
794+
>(
795+
query: RealtimePipeline,
796+
options: SnapshotListenOptions,
797+
onNext: (snapshot: RealtimePipelineSnapshot) => void,
798+
onError?: (error: FirestoreError) => void,
799+
onCompletion?: () => void
800+
): Unsubscribe;
801+
export function onPipelineSnapshot<
802+
AppModelType,
803+
DbModelType extends DocumentData
804+
>(reference: RealtimePipeline, ...args: unknown[]): Unsubscribe {
805+
reference = getModularInstance(reference);
806+
807+
let options: SnapshotListenOptions = {
808+
includeMetadataChanges: false,
809+
source: 'default'
810+
};
811+
let currArg = 0;
812+
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
813+
options = args[currArg] as SnapshotListenOptions;
814+
currArg++;
815+
}
816+
817+
const internalOptions = {
818+
includeMetadataChanges: options.includeMetadataChanges,
819+
source: options.source as ListenerDataSource
820+
};
821+
822+
if (isPartialObserver(args[currArg])) {
823+
const userObserver = args[currArg] as PartialObserver<
824+
QuerySnapshot<AppModelType, DbModelType>
825+
>;
826+
args[currArg] = userObserver.next?.bind(userObserver);
827+
args[currArg + 1] = userObserver.error?.bind(userObserver);
828+
args[currArg + 2] = userObserver.complete?.bind(userObserver);
829+
}
830+
831+
let observer: PartialObserver<ViewSnapshot>;
832+
let firestore: Firestore;
833+
let internalQuery: QueryOrPipeline;
834+
835+
// RealtimePipeline
836+
firestore = cast(reference._db, Firestore);
837+
internalQuery = toCorePipeline(reference);
838+
observer = {
839+
next: snapshot => {
840+
if (args[currArg]) {
841+
(args[currArg] as NextFn<RealtimePipelineSnapshot>)(
842+
new RealtimePipelineSnapshot(reference as RealtimePipeline, snapshot)
843+
);
844+
}
845+
},
846+
error: args[currArg + 1] as ErrorFn,
847+
complete: args[currArg + 2] as CompleteFn
848+
};
849+
850+
const client = ensureFirestoreConfigured(firestore);
851+
return firestoreClientListen(
852+
client,
853+
internalQuery,
854+
internalOptions,
855+
observer
856+
);
857+
}
858+
747859
// TODO(firestorexp): Make sure these overloads are tested via the Firestore
748860
// integration tests
749861

0 commit comments

Comments
 (0)