Let’s Go Asynchronous
Not only does zbus also provides with asynchronous API, most of the synchronous API you saw in action already, is in fact just thin wrappers around its asynchronous counterpart. Since you’re now already familiar the synchronous API, in this chapter we’ll focus on making the earlier code samples, asynchronous.
Establishing a connection
The only difference to that of synchronous Connection
API is that you use azync::Connection
type instead. This type’s API is almost identical to that of Connection
, except its asynchronous.
Moreover, it also provides a futures::stream::Stream
and futures::sink::Sink
implementations
to conveniently receive and send messages, respectively for the times when low-level API is more
appropriate for your use case.
Client
Similar to Connection
, you use azync::Proxy
type. Its constructors require azync::Connection
instead of Connection
. Moreover, dbus_proxy
macro generates an azync::Proxy
wrapper for you
as well. Let’s convert the last example in the previous chapter, to use the asynchronous connection
and proxy:
#![allow(unused)] fn main() { use futures_util::future::FutureExt; use zbus::{azync::Connection, dbus_proxy, Result}; use zvariant::{ObjectPath, OwnedObjectPath}; async_io::block_on(run()).unwrap(); async fn run() -> Result<()> { #[dbus_proxy( default_service = "org.freedesktop.GeoClue2", interface = "org.freedesktop.GeoClue2.Manager", default_path = "/org/freedesktop/GeoClue2/Manager" )] trait Manager { #[dbus_proxy(object = "Client")] fn get_client(&self); } #[dbus_proxy( default_service = "org.freedesktop.GeoClue2", interface = "org.freedesktop.GeoClue2.Client" )] trait Client { fn start(&self) -> Result<()>; fn stop(&self) -> Result<()>; #[dbus_proxy(property)] fn set_desktop_id(&mut self, id: &str) -> Result<()>; #[dbus_proxy(signal)] fn location_updated(&self, old: ObjectPath, new: ObjectPath) -> Result<()>; } #[dbus_proxy( default_service = "org.freedesktop.GeoClue2", interface = "org.freedesktop.GeoClue2.Location" )] trait Location { #[dbus_proxy(property)] fn latitude(&self) -> Result<f64>; #[dbus_proxy(property)] fn longitude(&self) -> Result<f64>; } let conn = Connection::new_system().await?; let manager = AsyncManagerProxy::new(&conn); let mut client = manager.get_client().await?; // Gotta do this, sorry! client.set_desktop_id("org.freedesktop.zbus").await?; client .connect_location_updated(move |_old, new| { let new = new.to_string(); let conn = conn.clone(); async move { let location = AsyncLocationProxy::builder(&conn).path(new)?.build(); println!( "Latitude: {}\nLongitude: {}", location.latitude().await?, location.longitude().await?, ); Ok(()) } .boxed() }) .await?; client.start().await?; // Wait till there is a signal that was handled. while client.next_signal().await?.is_some() {} Ok(()) } }
As you can see, nothing changed in the dbus_proxy
usage here and the rest largely remained the
same as well.
Receiving multiple signals, simultaneously
The asynchronous API also doesn’t include an equivalent of
SignalReceiver
. This is because
futures
crate (and others) already provide a rich API to
combine asynchronous operations in various ways. Let’s see that in action by converting the above
example again to receive multiple signals on different proxies:
#![allow(unused)] fn main() { use futures_util::future::FutureExt; use zbus::{azync::Connection, dbus_proxy, Result}; use zvariant::{ObjectPath, OwnedObjectPath}; async_io::block_on(run()).unwrap(); async fn run() -> Result<()> { #[dbus_proxy( default_service = "org.freedesktop.GeoClue2", interface = "org.freedesktop.GeoClue2.Manager", default_path = "/org/freedesktop/GeoClue2/Manager" )] trait Manager { #[dbus_proxy(object = "Client")] fn get_client(&self); } #[dbus_proxy( default_service = "org.freedesktop.GeoClue2", interface = "org.freedesktop.GeoClue2.Client" )] trait Client { fn start(&self) -> Result<()>; fn stop(&self) -> Result<()>; #[dbus_proxy(property)] fn set_desktop_id(&mut self, id: &str) -> Result<()>; #[dbus_proxy(signal)] fn location_updated(&self, old: ObjectPath, new: ObjectPath) -> Result<()>; } #[dbus_proxy( default_service = "org.freedesktop.GeoClue2", interface = "org.freedesktop.GeoClue2.Location" )] trait Location { #[dbus_proxy(property)] fn latitude(&self) -> Result<f64>; #[dbus_proxy(property)] fn longitude(&self) -> Result<f64>; } let conn = Connection::new_system().await?; let manager = AsyncManagerProxy::new(&conn); let mut client = manager.get_client().await?; // Everything else remains the same before this point. client.set_desktop_id("org.freedesktop.zbus").await?; let props = zbus::fdo::AsyncPropertiesProxy::builder(&conn) .destination("org.freedesktop.GeoClue2") .path(client.path())? .build(); props .connect_properties_changed(move |iface, changed, _| { for (name, value) in changed.iter() { println!("{}.{} changed to `{:?}`", iface, name, value); } async { Ok(()) }.boxed() }) .await?; client .connect_location_updated(move |_old, new| { let new = new.to_string(); let conn = conn.clone(); async move { let location = AsyncLocationProxy::builder(&conn) .path(new)? .build(); println!( "Latitude: {}\nLongitude: {}", location.latitude().await?, location.longitude().await?, ); Ok(()) } .boxed() }) .await?; client.start().await?; futures_util::try_join!( async { while props.next_signal().await?.is_some() {} Ok::<(),zbus::Error >(()) }, async { while client.next_signal().await?.is_some() {} // No need to specify type of Result each time Ok(()) } )?; Ok(()) } }
Server
No high-level server-side API are provided yet. Rest assured, it’s very high on our priority list. Stay tuned!