Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 37 additions & 15 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,9 +799,15 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}

let ReadCellOptions {
is_transient_cell,
tracking,
final_read_hint,
} = options;

let mut ctx = self.execute_context(turbo_tasks);
let (mut task, reader_task) = if self.should_track_dependencies()
&& !matches!(options.tracking, ReadTracking::Untracked)
&& !matches!(tracking, ReadTracking::Untracked)
&& let Some(reader_id) = reader
&& reader_id != task_id
{
Expand All @@ -814,16 +820,13 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
(ctx.task(task_id, TaskDataCategory::Data), None)
};

let content = if options.final_read_hint {
remove!(task, CellData { cell })
} else if let Some(content) = get!(task, CellData { cell }) {
let content = content.clone();
Some(content)
let content = if final_read_hint {
task.remove_cell_data(is_transient_cell, cell)
} else {
None
task.get_cell_data(is_transient_cell, cell)
};
if let Some(content) = content {
if options.tracking.should_track(false) {
if tracking.should_track(false) {
add_cell_dependency(task_id, task, reader, reader_task, cell);
}
return Ok(Ok(TypedCellContent(
Expand All @@ -850,7 +853,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
)
.copied();
let Some(max_id) = max_id else {
if options.tracking.should_track(true) {
if tracking.should_track(true) {
add_cell_dependency(task_id, task, reader, reader_task, cell);
}
bail!(
Expand All @@ -859,7 +862,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
);
};
if cell.index >= max_id {
if options.tracking.should_track(true) {
if tracking.should_track(true) {
add_cell_dependency(task_id, task, reader, reader_task, cell);
}
bail!(
Expand Down Expand Up @@ -1959,6 +1962,14 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
}));
}
if task_id.is_transient() || iter_many!(task, TransientCellData { cell }
if cell_counters.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index) => cell
).count() > 0 {
removed_data.extend(task.extract_if(CachedDataItemType::TransientCellData, |key, _| {
matches!(key, CachedDataItemKey::TransientCellData { cell } if cell_counters
.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
}));
}

old_edges.extend(
task.iter(CachedDataItemType::OutdatedCollectible)
Expand Down Expand Up @@ -2388,6 +2399,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
fn task_execution_completed_cleanup(&self, ctx: &mut impl ExecuteContext<'_>, task_id: TaskId) {
let mut task = ctx.task(task_id, TaskDataCategory::All);
task.shrink_to_fit(CachedDataItemType::CellData);
task.shrink_to_fit(CachedDataItemType::TransientCellData);
task.shrink_to_fit(CachedDataItemType::CellTypeMaxIndex);
task.shrink_to_fit(CachedDataItemType::CellDependency);
task.shrink_to_fit(CachedDataItemType::OutputDependency);
Expand Down Expand Up @@ -2523,13 +2535,14 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
&self,
task_id: TaskId,
cell: CellId,
_options: ReadCellOptions,
options: ReadCellOptions,
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
) -> Result<TypedCellContent> {
let mut ctx = self.execute_context(turbo_tasks);
let task = ctx.task(task_id, TaskDataCategory::Data);
if let Some(content) = get!(task, CellData { cell }) {
Ok(CellContent(Some(content.reference.clone())).into_typed(cell.type_id))
if let Some(content) = task.get_cell_data(options.is_transient_cell, cell) {
debug_assert!(content.type_id == cell.type_id, "Cell type ID mismatch");
Ok(CellContent(Some(content.reference)).into_typed(cell.type_id))
} else {
Ok(CellContent(None).into_typed(cell.type_id))
}
Expand Down Expand Up @@ -2670,6 +2683,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
&self,
task_id: TaskId,
cell: CellId,
is_transient_cell: bool,
content: CellContent,
verification_mode: VerificationMode,
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
Expand All @@ -2678,6 +2692,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
task_id,
cell,
content,
is_transient_cell,
verification_mode,
self.execute_context(turbo_tasks),
);
Expand Down Expand Up @@ -3265,12 +3280,19 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
&self,
task_id: TaskId,
cell: CellId,
is_transient_cell: bool,
content: CellContent,
verification_mode: VerificationMode,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) {
self.0
.update_task_cell(task_id, cell, content, verification_mode, turbo_tasks);
self.0.update_task_cell(
task_id,
cell,
is_transient_cell,
content,
verification_mode,
turbo_tasks,
);
}

fn mark_own_task_as_finished(
Expand Down
31 changes: 29 additions & 2 deletions turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use std::{
};

use serde::{Deserialize, Serialize};
use turbo_tasks::{FxIndexMap, KeyValuePair, TaskId, TurboTasksBackendApi};
use turbo_tasks::{
CellId, FxIndexMap, KeyValuePair, TaskId, TurboTasksBackendApi, TypedSharedReference,
};

use crate::{
backend::{
OperationGuard, TaskDataCategory, TransientTask, TurboTasksBackend, TurboTasksBackendInner,
TurboTasksBackendJob,
storage::{SpecificTaskDataCategory, StorageWriteGuard, get, iter_many},
storage::{SpecificTaskDataCategory, StorageWriteGuard, get, iter_many, remove},
},
backing_storage::BackingStorage,
data::{
Expand Down Expand Up @@ -467,6 +469,31 @@ pub trait TaskGuard: Debug {
.unwrap_or_default();
dirty_count < clean_count
}
fn remove_cell_data(
&mut self,
is_transient_cell: bool,
cell: CellId,
) -> Option<TypedSharedReference> {
if is_transient_cell {
remove!(self, TransientCellData { cell }).map(|sr| sr.into_typed(cell.type_id))
} else {
remove!(self, CellData { cell })
}
}
fn get_cell_data(&self, is_transient_cell: bool, cell: CellId) -> Option<TypedSharedReference> {
if is_transient_cell {
get!(self, TransientCellData { cell }).map(|sr| sr.clone().into_typed(cell.type_id))
} else {
get!(self, CellData { cell }).cloned()
}
}
fn has_cell_data(&self, is_transient_cell: bool, cell: CellId) -> bool {
if is_transient_cell {
self.has_key(&CachedDataItemKey::TransientCellData { cell })
} else {
self.has_key(&CachedDataItemKey::CellData { cell })
}
}
}

pub struct TaskGuardImpl<'a, B: BackingStorage> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
AggregationUpdateQueue, ExecuteContext, Operation, TaskGuard,
invalidate::make_task_dirty_internal,
},
storage::{get, get_many, remove},
storage::{get_many, remove},
},
data::{CachedDataItem, CachedDataItemKey, CellRef},
};
Expand All @@ -24,12 +24,14 @@ use crate::{
#[allow(clippy::large_enum_variant)]
pub enum UpdateCellOperation {
InvalidateWhenCellDependency {
is_transient_cell: bool,
cell_ref: CellRef,
dependent_tasks: SmallVec<[TaskId; 4]>,
content: Option<TypedSharedReference>,
queue: AggregationUpdateQueue,
},
FinalCellChange {
is_transient_cell: bool,
cell_ref: CellRef,
content: Option<TypedSharedReference>,
queue: AggregationUpdateQueue,
Expand All @@ -46,6 +48,7 @@ impl UpdateCellOperation {
task_id: TaskId,
cell: CellId,
content: CellContent,
is_transient_cell: bool,
#[cfg(feature = "verify_determinism")] verification_mode: VerificationMode,
#[cfg(not(feature = "verify_determinism"))] _verification_mode: VerificationMode,
mut ctx: impl ExecuteContext,
Expand All @@ -64,18 +67,17 @@ impl UpdateCellOperation {
let assume_unchanged =
!ctx.should_track_dependencies() || !task.has_key(&CachedDataItemKey::Dirty {});

let old_content = get!(task, CellData { cell });

if assume_unchanged {
if old_content.is_some() {
let has_old_content = task.has_cell_data(is_transient_cell, cell);
if has_old_content {
// Never update cells when recomputing if they already have a value.
// It's not expected that content changes during recomputation.

// Check if this assumption holds.
#[cfg(feature = "verify_determinism")]
if !is_stateful
&& matches!(verification_mode, VerificationMode::EqualityCheck)
&& content.as_ref() != old_content
&& content != task.get_cell_data(is_transient_cell, cell)
{
let task_description = ctx.get_task_description(task_id);
let cell_type = turbo_tasks::registry::get_value_type(cell.type_id).global_name;
Expand Down Expand Up @@ -115,12 +117,14 @@ impl UpdateCellOperation {
// tasks and after that set the new cell content. When the cell content is unset,
// readers will wait for it to be set via InProgressCell.

let old_content = task.remove(&CachedDataItemKey::CellData { cell });
let old_content =
task.remove(&CachedDataItemKey::cell_data(is_transient_cell, cell));

drop(task);
drop(old_content);

UpdateCellOperation::InvalidateWhenCellDependency {
is_transient_cell,
cell_ref: CellRef {
task: task_id,
cell,
Expand All @@ -138,12 +142,13 @@ impl UpdateCellOperation {
// So we can just update the cell content.

let old_content = if let Some(new_content) = content {
task.insert(CachedDataItem::CellData {
task.insert(CachedDataItem::cell_data(
is_transient_cell,
cell,
value: new_content,
})
new_content,
))
} else {
task.remove(&CachedDataItemKey::CellData { cell })
task.remove(&CachedDataItemKey::cell_data(is_transient_cell, cell))
};

let in_progress_cell = remove!(task, InProgressCell { cell });
Expand All @@ -163,6 +168,7 @@ impl Operation for UpdateCellOperation {
ctx.operation_suspend_point(&self);
match self {
UpdateCellOperation::InvalidateWhenCellDependency {
is_transient_cell,
cell_ref,
ref mut dependent_tasks,
ref mut content,
Expand Down Expand Up @@ -203,24 +209,23 @@ impl Operation for UpdateCellOperation {
}
if dependent_tasks.is_empty() {
self = UpdateCellOperation::FinalCellChange {
is_transient_cell,
cell_ref,
content: take(content),
queue: take(queue),
};
}
}
UpdateCellOperation::FinalCellChange {
is_transient_cell,
cell_ref: CellRef { task, cell },
content,
ref mut queue,
} => {
let mut task = ctx.task(task, TaskDataCategory::Data);

if let Some(content) = content {
task.add_new(CachedDataItem::CellData {
cell,
value: content,
})
task.add_new(CachedDataItem::cell_data(is_transient_cell, cell, content));
}

let in_progress_cell = remove!(task, InProgressCell { cell });
Expand Down
Loading
Loading