Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions client/src/dbus/api/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<'a> Service<'a> {
#[doc(alias = "CollectionCreated")]
pub async fn receive_collection_created(
&self,
) -> Result<impl Stream<Item = Collection<'_>>, Error> {
) -> Result<impl Stream<Item = Collection<'a>> + '_, Error> {
let stream = self.inner().receive_signal("CollectionCreated").await?;
let conn = self.inner().connection();
Ok(stream.filter_map(move |message| async move {
Expand All @@ -60,19 +60,17 @@ impl<'a> Service<'a> {
#[doc(alias = "CollectionDeleted")]
pub async fn receive_collection_deleted(
&self,
) -> Result<impl Stream<Item = Collection<'_>>, Error> {
) -> Result<impl Stream<Item = OwnedObjectPath>, Error> {
let stream = self.inner().receive_signal("CollectionDeleted").await?;
let conn = self.inner().connection();
Ok(stream.filter_map(move |message| async move {
let path = message.body().deserialize::<OwnedObjectPath>().ok()?;
Collection::new(conn, path).await.ok()
message.body().deserialize::<OwnedObjectPath>().ok()
}))
}

#[doc(alias = "CollectionChanged")]
pub async fn receive_collection_changed(
&self,
) -> Result<impl Stream<Item = Collection<'_>>, Error> {
) -> Result<impl Stream<Item = Collection<'a>> + '_, Error> {
let stream = self.inner().receive_signal("CollectionChanged").await?;
let conn = self.inner().connection();
Ok(stream.filter_map(move |message| async move {
Expand Down
77 changes: 50 additions & 27 deletions client/src/dbus/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::sync::Arc;

use futures_util::{Stream, StreamExt};
use zbus::zvariant::OwnedObjectPath;

use super::{api, Algorithm, Collection, Error, DEFAULT_COLLECTION};
use crate::Key;

Expand Down Expand Up @@ -97,15 +100,11 @@ impl<'a> Service<'a> {
///
/// Applications should make use of [`Service::default_collection`] instead.
pub async fn with_alias(&self, alias: &str) -> Result<Option<Collection<'a>>, Error> {
Ok(self.inner.read_alias(alias).await?.map(|collection| {
Collection::new(
Arc::clone(&self.inner),
Arc::clone(&self.session),
self.algorithm,
collection,
self.aes_key.clone(), // Cheap clone, it is an Arc,
)
}))
Ok(self
.inner
.read_alias(alias)
.await?
.map(|collection| self.new_collection(collection)))
}

/// Get a list of all the available collections.
Expand All @@ -115,15 +114,7 @@ impl<'a> Service<'a> {
.collections()
.await?
.into_iter()
.map(|collection| {
Collection::new(
Arc::clone(&self.inner),
Arc::clone(&self.session),
self.algorithm,
collection,
self.aes_key.clone(), // Cheap clone, it is an Arc,
)
})
.map(|collection| self.new_collection(collection))
.collect::<Vec<_>>())
}

Expand All @@ -139,15 +130,7 @@ impl<'a> Service<'a> {
self.inner
.create_collection(label, alias)
.await
.map(|collection| {
Collection::new(
Arc::clone(&self.inner),
Arc::clone(&self.session),
self.algorithm,
collection,
self.aes_key.clone(), // Cheap clone, it is an Arc,
)
})
.map(|collection| self.new_collection(collection))
}

/// Find a collection with it label.
Expand All @@ -160,6 +143,46 @@ impl<'a> Service<'a> {
}
Ok(None)
}

/// Stream yielding when new collections get created
pub async fn receive_collection_created(
&self,
) -> Result<impl Stream<Item = Collection<'a>> + '_, Error> {
Ok(self
.inner
.receive_collection_created()
.await?
.map(|collection| self.new_collection(collection)))
}

/// Stream yielding when existing collections get changed
pub async fn receive_collection_changed(
&self,
) -> Result<impl Stream<Item = Collection<'a>> + '_, Error> {
Ok(self
.inner
.receive_collection_changed()
.await?
.map(|collection| self.new_collection(collection)))
}

/// Stream yielding when existing collections get deleted
pub async fn receive_collection_deleted(
&self,
) -> Result<impl Stream<Item = OwnedObjectPath>, Error> {
self.inner.receive_collection_deleted().await
}

// Get public `Collection` from `api::Collection`
fn new_collection(&self, collection: api::Collection<'a>) -> Collection<'a> {
Collection::new(
Arc::clone(&self.inner),
Arc::clone(&self.session),
self.algorithm,
collection,
self.aes_key.clone(), // Cheap clone, it is an Arc,
)
}
}

#[cfg(test)]
Expand Down