16 Min to complete
In this lesson, you’ll build a simple Rust application that will connect to a ScyllaDB cluster and perform basic queries. For establishing communication between the application and the ScyllaDB server, you will use the scylla-rust-driver, which is a source-available ScyllaDB driver for Rust.
The ScyllaDB Rust Driver is a client-side driver for ScyllaDB written in pure Rust with a fully async API using Tokio. Although optimized for ScyllaDB, the driver is also compatible with Apache Cassandra®.
Starting ScyllaDB in Docker
Before starting the cluster, make sure the aio-max-nr value is high enough (1048576 or more).
This parameter determines the maximum number of allowable Asynchronous non-blocking I/O (AIO) concurrent requests by the Linux Kernel, and it helps ScyllaDB perform in a heavy I/O workload environment.
Check the value:
copycat /proc/sys/fs/aio-max-nr
If it needs to be changed:
copyecho "fs.aio-max-nr = 1048576" >> /etc/sysctl.conf
copysysctl -p /etc/sysctl.conf
ubuntu $ cat /proc/sys/fs/aio-max-nr | |
65536 | |
ubuntu $ echo "fs.aio-max-nr = 1048576" >> /etc/sysctl.conf | |
ubuntu $ sysctl -p /etc/sysctl.conf | |
fs.inotify.max_user_watches = 524288 | |
fs.aio-max-nr = 1048576 | |
ubuntu $ |
If you haven’t done so yet, download the example from git:
copygit clone https://github.com/scylladb/scylla-code-samples.git
copycd scylla-code-samples/Rust_Scylla_Driver/ps-logger/
To quickly get ScyllaDB up and running, use the official Docker image:
copydocker run \ -p 9042:9042/tcp \ --name some-scylla \ --hostname some-scylla \ -d scylladb/scylla:5.2.0 \ --smp 1 --memory=750M --overprovisioned 1
[guy@localhost ps-logger]$ docker run \ | |
> -p 9042:9042/tcp \ | |
> --name some-scylla \ | |
> --hostname some-scylla \ | |
> -d scylladb/scylla:4.5.0 \ | |
> --smp 1 --memory=750M --overprovisioned 1 | |
3dcf484465f0b82daf9206a1cac72de5e986d49326d07224db8ce7fadc2e145b | |
[guy@localhost ps-logger]$ |
Note that in this lesson, it is assumed that the ScyllaDB instance is run on a local machine.
Wait a few seconds until the node is up. Use cqlsh to create a keyspace and table on the ScyllaDB server:
copydocker exec -it some-scylla cqlsh
Data Schema
The application will be able to store and query temperature time-series data. Each measurement will contain the following information:
- The sensor ID for the sensor that measured the temperature
- The time the temperature was measured
- The temperature value
First, create a keyspace called tutorial:
copyCREATE KEYSPACE IF NOT EXISTS tutorial WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 };
As this is just an example, you’ll use SimpleStrategy with a single datacenter. The cluster has a single node, so set the Replication Factor to one.
Keep in mind that SimpleStrategy should not be used in production.
Based on the desired query being the temperature reported by a specific device for a given time interval, create the following table:
copyCREATE TABLE IF NOT EXISTS tutorial.temperature ( device UUID, time timestamp, temperature smallint, PRIMARY KEY(device, time) );
You can learn more about Basic Data Modeling here.
The application you’re building will be able to query all temperatures measured by a given device within a selected time frame. That’s why you will use the following SELECT query:
SELECT * FROM tutorial.temperature | |
WHERE device = ? | |
AND time > ? | |
AND time < ?; |
where ? will be replaced with actual values – device ID, time-from, and time-to, respectively.
Next exit the CQL Shell:
copyexit
Rust and Connection to the DB
If you don’t already have Rust and Cargo installed, go ahead and install it using the rustup.rs toolchain:
copycurl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
This will help you to install the Rust compiler with other helpful tools.
The application name is temperature, and the required dependencies are defined in the Cargo.toml file:
uuid = {version = "0.8", features = ["v4"]} | |
tokio = {version = "1.1.0", features = ["full"]} | |
scylla = "0.3.1" | |
futures = "0.3.6" | |
chrono = "0.4.0" |
Where:
- uuid – Package that provides UUID.
- tokio – Provides the async runtime to execute database queries in.
- scylla – Rust ScyllaDB/Casandra driver.
- chrono – Package for working with time.
The file `/src/result.rs` contains just:
pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; |
You’ll use this Result type in the application code to make error handling easier. It allows you to
return a result of a generic type `T` and any error that can be converted to a Box<Error>.
Now it’s possible to declare the `result` module and use the `Result` type in `/src/main.rs`:
use crate::result::Result; | |
mod result; |
The `main` function works asynchronously by using `tokio`. The following makes sure it returns the result:
#[tokio::main] | |
async fn main() -> Result<()> { | |
... | |
} |
The file `/src/db.rs` will hold the logic for working with the ScyllaDB instance. The first step is to establish a database session.
For this example, you won’t use authentication:
Note: See here for an example on user authentication.
use scylla::{Session, SessionBuilder}; | |
use crate::Result; | |
pub async fn create_session(uri: &str) -> Result<Session> { | |
SessionBuilder::new() | |
.known_node(uri) | |
.build() | |
.await | |
.map_err(From::from) | |
} |
The file `/src/main.rs` imports the `db` module:
mod result; | |
mod db; |
And then it initializes the session like so:
#[tokio::main] | |
async fn main() -> Result<()> { | |
println!("connecting to db"); | |
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); | |
let session = db::create_session(&uri).await?; | |
todo!() | |
} |
With this, you’ll use the `SCYLLA_URI` environment variable or `127.0.0.1:9042` if not provided.
Notice the `.await` after `create_session`. That’s because `async` functions return a Future. Futures can be await-ed inside other `async` functions to get their actual value, which in this case is `Result<Session, Error>`. And lastly, with the `?` after `await` we are making sure that if we get back an error instead of a session from `create_session`, the error will be propagated up, and the application will terminate, printing the error.
**Note**: In this lesson, there is a single node, so choosing a load balancing strategy doesn’t make a difference. The available load balancing strategies are listed in the documentation. By default, `Token aware Round robin` is used.
Next, the file `/src/db.rs`, defines functions for creating the keyspace and table to store temperature measurements. You’ll use queries for creating the keyspace and a table:
use scylla::{IntoTypedRows, Session, SessionBuilder}; | |
use uuid::Uuid; | |
use crate::{Duration, Result, TemperatureMeasurement}; | |
static CREATE_KEYSPACE_QUERY: &str = r#" | |
CREATE KEYSPACE IF NOT EXISTS tutorial | |
WITH REPLICATION = { | |
'class': 'SimpleStrategy', | |
'replication_factor': 1 | |
}; | |
"#; | |
static CREATE_TEMPERATURE_TABLE_QUERY: &str = r#" | |
CREATE TABLE IF NOT EXISTS tutorial.temperature ( | |
device UUID, | |
time timestamp, | |
temperature smallint, | |
PRIMARY KEY(device, time) | |
); | |
"#; | |
pub async fn initialize(session: &Session) -> Result<()> { | |
create_keyspace(session).await?; | |
create_temperature_table(session).await?; | |
Ok(()) | |
} | |
async fn create_keyspace(session: &Session) -> Result<()> { | |
session | |
.query(CREATE_KEYSPACE_QUERY, ()) | |
.await | |
.map(|_| ()) | |
.map_err(From::from) | |
} | |
async fn create_temperature_table(session: &Session) -> Result<()> { | |
session | |
.query(CREATE_TEMPERATURE_TABLE_QUERY, ()) | |
.await | |
.map(|_| ()) | |
.map_err(From::from) | |
} |
The `initialize` function in `main` is used to create the keyspace and table:
async fn main() -> Result<()> { | |
println!("connecting to db"); | |
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); | |
let session = db::create_session(&uri).await?; | |
db::initialize(&session).await?; | |
todo!() | |
} |
The file `/src/temperature_measurement.rs` defines a structure that will represent a single temperature measurement:
use uuid::Uuid; | |
use scylla::FromRow; | |
use scylla::ValueList; | |
use crate::Duration; | |
#[derive(Debug, FromRow, ValueList)] | |
pub struct TemperatureMeasurement { | |
pub device: Uuid, | |
pub time: Duration, | |
pub temperature: i16, | |
} |
Next, the file `/src/duration.rs` is the custom implementation of `Duration` that can be both serialized and deserialized at the same time:
use scylla::frame::response::result::CqlValue; | |
use scylla::frame::value::{Value, ValueTooBig}; | |
use scylla::frame::{ | |
response::cql_to_rust::{FromCqlVal, FromCqlValError}, | |
value::Timestamp, | |
}; | |
#[derive(Debug)] | |
pub struct Duration(chrono::Duration); | |
impl Duration { | |
pub fn seconds(secs: i64) -> Self { | |
Self(chrono::Duration::seconds(secs)) | |
} | |
} | |
impl Value for Duration { | |
fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), ValueTooBig> { | |
Timestamp(self.0).serialize(buf) | |
} | |
} | |
impl FromCqlVal<Option<CqlValue>> for Duration { | |
fn from_cql(cql_val: Option<CqlValue>) -> Result<Self, FromCqlValError> { | |
chrono::Duration::from_cql(cql_val).map(Self) | |
} | |
} |
As part of standard `Debug`, two extra traits are derived – `FromRow` and `ValueList` provided by the driver.
`FromRow` allows you to convert the database rows into instances of `TemperatureMeasurement` and `ValueList` allows you to use
an instance of `TemperatureMeasurement` as a value argument to a query, instead of listing all of its fields separately.
Derive during the compilation time auto-generates code according to associated procedural macros. Procedural macros and derive is an advanced topic that is out of the scope of this lesson.
The file `/src/db.rs`, defines the insert query. ScyllaDB will use each value as a replacement for ?:
static ADD_MEASUREMENT_QUERY: &str = r#" | |
INSERT INTO tutorial.temperature (device, time, temperature) | |
VALUES (?, ?, ?); | |
"#; | |
pub async fn add_measurement(session: &Session, measurement: TemperatureMeasurement) -> Result<()> { | |
session | |
.query(ADD_MEASUREMENT_QUERY, measurement) | |
.await | |
.map(|_| ()) | |
.map_err(From::from) | |
} |
Here `ValueList` allows you to use query string templates with ? as a placeholder for dynamic values. The values themselves are provided by providing
an instance of the struct with fields named the same way.
Reading Measurements
Next, the select-query logic is defined in the `/src/db.rs` module:
static SELECT_MEASUREMENTS_QUERY: &str = r#" | |
SELECT * FROM fast_logger.temperature | |
WHERE device = ? | |
AND time > ? | |
AND time < ?; | |
"#; | |
pub async fn select_measurements( | |
session: &Session, | |
device: Uuid, | |
time_from: Duration, | |
time_to: Duration, | |
) -> Result<Vec<TemperatureMeasurement>> { | |
session | |
.query(SELECT_MEASUREMENTS_QUERY, (device, time_from, time_to)) | |
.await? | |
.rows | |
.unwrap_or_default() | |
.into_typed::<TemperatureMeasurement>() | |
.map(|v| v.map_err(From::from)) | |
.collect() | |
} |
The important steps are:
- Make a select query with the specified parameters (device ID, start and end date).
- Await the response and convert it into rows.
- The rows might be empty, `unwrap_or_default` ensures that you will get an empty `Vec` if that’s the case.
- Once the rows are obtained, convert each row by using `into_typed::<TemperatureMeasurement>()`, which will use the `FromRow` derive macro.
- Since `into_typed` returns a `Result`, that means converting each result might fail, with `.map(|v| v.map_err(From::from))` you ensure that each row’s error will be converted to the generic error defined in `/src/result.rs`.
- Finally, `collect` saves the iterated values to a vector.
Now, back in `/src/main.rs` you can see the rest of the `main` function, imports, and modules:
use uuid::Uuid; | |
use crate::duration::Duration; | |
use crate::result::Result; | |
use crate::temperature_measurement::TemperatureMeasurement; | |
mod db; | |
mod duration; | |
mod result; | |
mod temperature_measurement; | |
#[tokio::main] | |
async fn main() -> Result<()> { | |
println!("connecting to db"); | |
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); | |
let session = db::create_session(&uri).await?; | |
db::initialize(&session).await?; | |
println!("Adding measurements"); | |
let measurement = TemperatureMeasurement { | |
device: Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?, | |
time: Duration::seconds(1000000000001), | |
temperature: 40, | |
}; | |
db::add_measurement(&session, measurement).await?; | |
let measurement = TemperatureMeasurement { | |
device: Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?, | |
time: Duration::seconds(1000000000003), | |
temperature: 60, | |
}; | |
db::add_measurement(&session, measurement).await?; | |
println!("Selecting measurements"); | |
let measurements = db::select_measurements( | |
&session, | |
Uuid::parse_str("72f6d49c-76ea-44b6-b1bb-9186704785db")?, | |
Duration::seconds(1000000000000), | |
Duration::seconds(10000000000009), | |
) | |
.await?; | |
println!(" >> Measurements: {:?}", measurements); | |
Ok(()) | |
} |
To run the example:
copycargo run
guy@localhost ps-logger]$ cargo run | |
Compiling autocfg v1.0.1 | |
Compiling libc v0.2.109 | |
Compiling proc-macro2 v1.0.33 | |
Compiling unicode-xid v0.2.2 | |
Compiling syn v1.0.82 | |
Compiling cfg-if v1.0.0 | |
Compiling futures-core v0.3.18 | |
Compiling memchr v2.4.1 | |
Compiling serde v1.0.130 | |
Compiling getrandom v0.1.16 | |
Compiling futures-channel v0.3.18 | |
Compiling futures-task v0.3.18 | |
Compiling log v0.4.8 | |
Compiling pin-project-lite v0.2.7 | |
Compiling ppv-lite86 v0.2.15 | |
Compiling futures-util v0.3.18 | |
Compiling futures-sink v0.3.18 | |
Compiling cfg-if v0.1.10 | |
Compiling smallvec v1.7.0 | |
Compiling futures-io v0.3.18 | |
Compiling slab v0.4.5 | |
Compiling pin-utils v0.1.0 | |
Compiling scopeguard v1.1.0 | |
Compiling snap v1.0.5 | |
Compiling lazy_static v1.4.0 | |
Compiling bytes v1.1.0 | |
Compiling byteorder v1.4.3 | |
Compiling once_cell v1.8.0 | |
Compiling either v1.6.1 | |
Compiling arc-swap v1.5.0 | |
Compiling histogram v0.6.9 | |
Compiling instant v0.1.12 | |
Compiling lock_api v0.4.5 | |
Compiling num-traits v0.2.14 | |
Compiling num-integer v0.1.44 | |
Compiling num-bigint v0.3.3 | |
Compiling num-iter v0.1.42 | |
Compiling num-rational v0.3.2 | |
Compiling tokio v1.14.0 | |
Compiling tracing-core v0.1.21 | |
Compiling itertools v0.10.3 | |
Compiling getrandom v0.2.3 | |
Compiling parking_lot_core v0.8.3 | |
Compiling num_cpus v1.13.0 | |
Compiling time v0.1.42 | |
Compiling signal-hook-registry v1.4.0 | |
Compiling mio v0.7.14 | |
Compiling dashmap v4.0.2 | |
Compiling rand_core v0.6.3 | |
Compiling uuid v0.8.2 | |
Compiling rand_core v0.5.1 | |
Compiling parking_lot v0.11.1 | |
Compiling rand_chacha v0.3.1 | |
Compiling rand_chacha v0.2.2 | |
Compiling rand v0.7.3 | |
Compiling rand v0.8.4 | |
Compiling quote v1.0.10 | |
Compiling num-complex v0.3.1 | |
Compiling chrono v0.4.11 | |
Compiling bigdecimal v0.2.2 | |
Compiling toml v0.5.8 | |
Compiling num v0.3.1 | |
Compiling compress v0.2.1 | |
Compiling thiserror-impl v1.0.30 | |
Compiling futures-macro v0.3.18 | |
Compiling tokio-macros v1.6.0 | |
Compiling tracing-attributes v0.1.18 | |
Compiling derivative v2.2.0 | |
Compiling scylla-macros v0.1.1 | |
Compiling tracing v0.1.29 | |
Compiling thiserror v1.0.30 | |
Compiling proc-macro-crate v1.1.0 | |
Compiling num_enum_derive v0.5.4 | |
Compiling num_enum v0.5.4 | |
Compiling futures-executor v0.3.18 | |
Compiling futures v0.3.18 | |
Compiling scylla v0.3.1 | |
Compiling temperature v0.1.0 (/home/guy/testing-rust/temperature) | |
Finished dev [unoptimized + debuginfo] target(s) in 26.49s | |
Running `target/debug/temperature .` | |
connecting to db | |
Adding measurements | |
Selecting measurements | |
>> Measurements: [TemperatureMeasurement { device: 72f6d49c-76ea-44b6-b1bb-9186704785db, time: Duration(Duration { secs: 1000000000001, nanos: 0 }), temperature: 40 }, TemperatureMeasurement { device: 72f6d49c-76ea-44b6-b1bb-9186704785db, time: Duration(Duration { secs: 1000000000003, nanos: 0 }), temperature: 60 }] | |
[guy@localhost ps-logger]$ |
Conclusion
In this lesson, you created a simple Rust application that allowed you to connect to a one node ScyllaDB cluster, store, and select temperature measurements from sensors.
Future topics not discussed in this lesson include query preparation, query batching, and execution of prepared queries. Another topic that is left aside is multi-node clusters. Stay tuned for more lessons and check out the scylla-rust-driver examples and documentation.