Skip to content

Commit dfbb7a8

Browse files
committed
improve the internal apis for node history
1 parent d5b1e3c commit dfbb7a8

File tree

6 files changed

+265
-76
lines changed

6 files changed

+265
-76
lines changed

raphtory/src/core/entities/properties/tcell.rs

+42
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,48 @@ impl<A: Send + Sync> TimeIndexOps for TCell<A> {
232232
}
233233
}
234234

235+
impl<'b, A: Send + Sync> TimeIndexOps for &'b TCell<A> {
236+
type IndexType = TimeIndexEntry;
237+
type RangeType<'a>
238+
= TimeIndexWindow<'a, TimeIndexEntry, TCell<A>>
239+
where
240+
Self: 'a;
241+
242+
fn active(&self, w: Range<Self::IndexType>) -> bool {
243+
TimeIndexOps::active(*self, w)
244+
}
245+
246+
fn range(&self, w: Range<Self::IndexType>) -> Self::RangeType<'_> {
247+
TimeIndexOps::range(*self, w)
248+
}
249+
250+
fn first(&self) -> Option<Self::IndexType> {
251+
TimeIndexOps::first(*self)
252+
}
253+
254+
fn last(&self) -> Option<Self::IndexType> {
255+
TimeIndexOps::last(*self)
256+
}
257+
258+
fn iter(&self) -> BoxedLIter<Self::IndexType> {
259+
TimeIndexOps::iter(*self)
260+
}
261+
262+
fn len(&self) -> usize {
263+
TimeIndexOps::len(*self)
264+
}
265+
}
266+
267+
impl<'a, A: Send + Sync> TimeIndexLike for &'a TCell<A> {
268+
fn range_iter(&self, w: Range<Self::IndexType>) -> BoxedLIter<Self::IndexType> {
269+
Box::new(self.iter_window(w).map(|(ti, _)| *ti))
270+
}
271+
272+
fn last_range(&self, w: Range<Self::IndexType>) -> Option<Self::IndexType> {
273+
self.iter_window(w).next_back().map(|(ti, _)| *ti)
274+
}
275+
}
276+
235277
#[cfg(test)]
236278
mod tcell_tests {
237279
use super::TCell;

raphtory/src/db/api/storage/graph/storage_ops/time_semantics.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,14 @@ impl TimeSemantics for GraphStorage {
129129
Some(w) => node
130130
.additions()
131131
.into_range(TimeIndexEntry::range(w))
132-
.into_prop_events(),
133-
None => node.additions().into_prop_events(),
132+
.into_prop_events()
133+
.into_iter()
134+
.into_dyn_boxed(),
135+
None => node
136+
.additions()
137+
.into_prop_events()
138+
.into_iter()
139+
.into_dyn_boxed(),
134140
})
135141
.into_dyn_boxed()
136142
}

raphtory/src/db/api/storage/graph/variants/storage_variants.rs

+69-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use crate::{core::Prop, db::api::storage::graph::tprop_storage_ops::TPropOps};
2-
use raphtory_api::core::storage::timeindex::TimeIndexEntry;
2+
use raphtory_api::{
3+
core::storage::timeindex::{TimeIndexEntry, TimeIndexIntoOps, TimeIndexOps},
4+
iter::BoxedLIter,
5+
};
36
use rayon::iter::{
47
plumbing::{Consumer, ProducerCallback, UnindexedConsumer},
58
IndexedParallelIterator, ParallelIterator,
@@ -278,3 +281,68 @@ impl<'a, Mem: TPropOps<'a> + 'a, #[cfg(feature = "storage")] Disk: TPropOps<'a>
278281
for_all!(self, props => props.at(ti))
279282
}
280283
}
284+
285+
impl<
286+
Mem: TimeIndexOps,
287+
#[cfg(feature = "storage")] Disk: TimeIndexOps<IndexType = Mem::IndexType>,
288+
> TimeIndexOps for SelfType!(Mem, Disk)
289+
{
290+
type IndexType = Mem::IndexType;
291+
292+
#[cfg(feature = "storage")]
293+
type RangeType<'a>
294+
= StorageVariants<Mem::RangeType<'a>, Disk::RangeType<'a>>
295+
where
296+
Self: 'a;
297+
298+
#[cfg(not(feature = "storage"))]
299+
type RangeType<'a>
300+
= Mem::RangeType<'a>
301+
where
302+
Self: 'a;
303+
304+
fn active(&self, w: Range<Self::IndexType>) -> bool {
305+
for_all!(self, props => props.active(w))
306+
}
307+
308+
fn range(&self, w: Range<Self::IndexType>) -> Self::RangeType<'_> {
309+
for_all_iter!(self, props => props.range(w))
310+
}
311+
312+
fn first(&self) -> Option<Self::IndexType> {
313+
for_all!(self, props => props.first())
314+
}
315+
316+
fn last(&self) -> Option<Self::IndexType> {
317+
for_all!(self, props => props.last())
318+
}
319+
320+
fn iter(&self) -> BoxedLIter<Self::IndexType> {
321+
for_all!(self, props => props.iter())
322+
}
323+
324+
fn len(&self) -> usize {
325+
for_all!(self, props => props.len())
326+
}
327+
}
328+
329+
impl<
330+
Mem: TimeIndexIntoOps,
331+
#[cfg(feature = "storage")] Disk: TimeIndexIntoOps<IndexType = Mem::IndexType>,
332+
> TimeIndexIntoOps for SelfType!(Mem, Disk)
333+
{
334+
type IndexType = Mem::IndexType;
335+
#[cfg(feature = "storage")]
336+
type RangeType = StorageVariants<Mem::RangeType, Disk::RangeType>;
337+
338+
#[cfg(not(feature = "storage"))]
339+
type RangeType = Mem::RangeType;
340+
341+
fn into_range(self, w: Range<Self::IndexType>) -> Self::RangeType {
342+
for_all_iter!(self, props => props.into_range(w))
343+
}
344+
345+
fn into_iter(self) -> impl Iterator<Item = Self::IndexType> + Send + Sync {
346+
for_all_iter!(self, props => props.into_iter())
347+
}
348+
}

raphtory/src/db/api/view/internal/core_ops.rs

+18-10
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use raphtory_api::core::{
3333
};
3434
use std::{iter, ops::Range};
3535

36+
use crate::db::api::storage::graph::variants::storage_variants::StorageVariants;
3637
#[cfg(feature = "storage")]
3738
use pometry_storage::timestamps::LayerAdditions;
3839

@@ -375,20 +376,27 @@ pub enum NodeAdditions<'a> {
375376
}
376377

377378
impl<'a> NodeAdditions<'a> {
378-
pub fn into_prop_events(self) -> BoxedLIter<'a, TimeIndexEntry> {
379+
pub fn into_prop_events(
380+
self,
381+
) -> impl TimeIndexOps<IndexType = TimeIndexEntry>
382+
+ TimeIndexIntoOps<IndexType = TimeIndexEntry>
383+
+ use<'a> {
379384
match self {
380-
NodeAdditions::Mem(index) => Box::new(index.props_ts.iter().map(|(time, _)| *time)),
381-
NodeAdditions::Range(index) => match index {
382-
TimeIndexWindow::Empty => Box::new(iter::empty()),
385+
NodeAdditions::Mem(index) => {
386+
StorageVariants::Mem(TimeIndexWindow::All(&index.props_ts))
387+
}
388+
NodeAdditions::Range(index) => StorageVariants::Mem(match index {
389+
TimeIndexWindow::Empty => TimeIndexWindow::Empty,
383390
TimeIndexWindow::TimeIndexRange { timeindex, range } => {
384-
Box::new(timeindex.props_ts.range_iter(range))
391+
TimeIndexWindow::TimeIndexRange {
392+
timeindex: &timeindex.props_ts,
393+
range,
394+
}
385395
}
386-
TimeIndexWindow::All(idx) => Box::new(idx.props_ts.iter().map(|(time, _)| *time)),
387-
},
396+
TimeIndexWindow::All(idx) => TimeIndexWindow::All(&idx.props_ts),
397+
}),
388398
#[cfg(feature = "storage")]
389-
NodeAdditions::Col(index) => {
390-
Box::new(index.prop_events().map(|ts| ts.into_iter()).kmerge())
391-
}
399+
NodeAdditions::Col(index) => StorageVariants::Disk(index.prop_events()),
392400
}
393401
}
394402

raphtory/src/db/graph/views/deletion_graph.rs

+23-1
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,29 @@ impl TimeSemantics for PersistentGraph {
336336
}
337337

338338
fn node_property_history(&self, v: VID, w: Option<Range<i64>>) -> BoxedLIter<TimeIndexEntry> {
339-
self.0.node_property_history(v, w)
339+
match w {
340+
None => self.0.node_property_history(v, w),
341+
Some(w) => {
342+
let node = self.core_node_entry(v);
343+
GenLockedIter::from(node, |node| {
344+
has_persisted_event(
345+
&node.additions().into_prop_events(),
346+
&TimeIndex::Empty,
347+
w.start,
348+
)
349+
.then_some(TimeIndexEntry::start(w.start))
350+
.into_iter()
351+
.chain(
352+
node.additions()
353+
.into_range_t(w)
354+
.into_prop_events()
355+
.into_iter(),
356+
)
357+
.into_dyn_boxed()
358+
})
359+
.into_dyn_boxed()
360+
}
361+
}
340362
}
341363

342364
fn edge_history<'a>(

0 commit comments

Comments
 (0)