Extending the Framework
This guide shows how to extend the framework with custom workloads, expectations, runners, and topology helpers. Each section includes the trait outline and a minimal code example.
Adding a Workload
Steps:
- Implement
testing_framework_core::scenario::Workload - Provide a name and any bundled expectations
- Use
initto derive inputs from topology/metrics; fail fast if prerequisites missing - Use
startto drive async traffic usingRunContextclients - Expose from
testing-framework/workflowsand optionally add a DSL helper
Trait outline:
use async_trait::async_trait;
use testing_framework_core::scenario::{
DynError, Expectation, RunContext, RunMetrics, Workload,
};
use testing_framework_core::topology::generation::GeneratedTopology;
struct MyExpectation;
#[async_trait]
impl Expectation for MyExpectation {
fn name(&self) -> &str {
"my_expectation"
}
async fn evaluate(&mut self, _ctx: &RunContext) -> Result<(), DynError> {
Ok(())
}
}
pub struct MyWorkload {
// Configuration fields
target_rate: u64,
}
impl MyWorkload {
pub fn new(target_rate: u64) -> Self {
Self { target_rate }
}
}
#[async_trait]
impl Workload for MyWorkload {
fn name(&self) -> &str {
"my_workload"
}
fn expectations(&self) -> Vec<Box<dyn Expectation>> {
// Return bundled expectations that should run with this workload
vec![Box::new(MyExpectation)]
}
fn init(
&mut self,
topology: &GeneratedTopology,
_run_metrics: &RunMetrics,
) -> Result<(), DynError> {
// Validate prerequisites (e.g., enough nodes, wallet data present)
if topology.validators().is_empty() {
return Err("no validators available".into());
}
Ok(())
}
async fn start(&self, ctx: &RunContext) -> Result<(), DynError> {
// Drive async activity: submit transactions, query nodes, etc.
let clients = ctx.node_clients().validator_clients();
for client in clients {
let info = client.consensus_info().await?;
tracing::info!(height = info.height, "workload queried node");
}
Ok(())
}
}
Key points:
name()identifies the workload in logsexpectations()bundles default checks (can be empty)init()validates topology before run startsstart()executes concurrently with other workloads; it should complete before run duration expires
See Example: New Workload & Expectation for a complete, runnable example.
Adding an Expectation
Steps:
- Implement
testing_framework_core::scenario::Expectation - Use
start_captureto snapshot baseline metrics (optional) - Use
evaluateto assert outcomes after workloads finish - Return descriptive errors; the runner aggregates them
- Export from
testing-framework/workflowsif reusable
Trait outline:
use async_trait::async_trait;
use testing_framework_core::scenario::{DynError, Expectation, RunContext};
pub struct MyExpectation {
expected_value: u64,
captured_baseline: Option<u64>,
}
impl MyExpectation {
pub fn new(expected_value: u64) -> Self {
Self {
expected_value,
captured_baseline: None,
}
}
}
#[async_trait]
impl Expectation for MyExpectation {
fn name(&self) -> &str {
"my_expectation"
}
async fn start_capture(&mut self, ctx: &RunContext) -> Result<(), DynError> {
// Optional: capture baseline state before workloads start
let client = ctx.node_clients().validator_clients().first()
.ok_or("no validators")?;
let info = client.consensus_info().await?;
self.captured_baseline = Some(info.height);
tracing::info!(baseline = self.captured_baseline, "captured baseline");
Ok(())
}
async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> {
// Assert the expected condition holds after workloads finish
let client = ctx.node_clients().validator_clients().first()
.ok_or("no validators")?;
let info = client.consensus_info().await?;
let final_height = info.height;
let baseline = self.captured_baseline.unwrap_or(0);
let delta = final_height.saturating_sub(baseline);
if delta < self.expected_value {
return Err(format!(
"expected at least {} blocks, got {}",
self.expected_value, delta
).into());
}
tracing::info!(delta, "expectation passed");
Ok(())
}
}
Key points:
name()identifies the expectation in logsstart_capture()runs before workloads start (optional)evaluate()runs after workloads finish; return descriptive errors- Expectations run sequentially; keep them fast
Adding a Runner (Deployer)
Steps:
- Implement
testing_framework_core::scenario::Deployer<Caps>for your capability type - Deploy infrastructure and return a
Runner - Construct
NodeClientsand spawn aBlockFeed - Build a
RunContextand provide aCleanupGuardfor teardown
Trait outline:
use async_trait::async_trait;
use testing_framework_core::scenario::{
CleanupGuard, Deployer, DynError, Metrics, NodeClients, RunContext, Runner, Scenario,
spawn_block_feed,
};
use testing_framework_core::topology::deployment::Topology;
pub struct MyDeployer {
// Configuration: cluster connection details, etc.
}
impl MyDeployer {
pub fn new() -> Self {
Self {}
}
}
#[async_trait]
impl Deployer<()> for MyDeployer {
type Error = DynError;
async fn deploy(&self, scenario: &Scenario<()>) -> Result<Runner, Self::Error> {
// 1. Launch nodes using scenario.topology()
// 2. Wait for readiness (e.g., consensus info endpoint responds)
// 3. Build NodeClients for validators/executors
// 4. Spawn a block feed for expectations (optional but recommended)
// 5. Create NodeControlHandle if you support restarts (optional)
// 6. Return a Runner wrapping RunContext + CleanupGuard
tracing::info!("deploying scenario with MyDeployer");
let topology: Option<Topology> = None; // Some(topology) if you spawned one
let node_clients = NodeClients::default(); // Or NodeClients::from_topology(...)
let client = node_clients
.any_client()
.ok_or("no api clients available")?
.clone();
let (block_feed, block_feed_guard) = spawn_block_feed(client).await?;
let telemetry = Metrics::empty(); // or Metrics::from_prometheus(...)
let node_control = None; // or Some(Arc<dyn NodeControlHandle>)
let context = RunContext::new(
scenario.topology().clone(),
topology,
node_clients,
scenario.duration(),
telemetry,
block_feed,
node_control,
);
// If you also have other resources to clean up (containers/pods/etc),
// wrap them in your own CleanupGuard implementation and call
// CleanupGuard::cleanup(Box::new(block_feed_guard)) inside it.
Ok(Runner::new(context, Some(Box::new(block_feed_guard))))
}
}
Key points:
deploy()must return a fully preparedRunner- Block until nodes are ready before returning (avoid false negatives)
- Use a
CleanupGuardto tear down resources on failure (and onRunHandledrop) - If you want chaos workloads, also provide a
NodeControlHandleviaRunContext
Adding Topology Helpers
Steps:
- Extend
testing_framework_core::topology::config::TopologyBuilderwith new layouts - Keep defaults safe: ensure at least one participant, clamp dispersal factors
- Consider adding configuration presets for specialized parameters
Example:
use testing_framework_core::topology::{
config::TopologyBuilder,
configs::network::Libp2pNetworkLayout,
};
pub trait TopologyBuilderExt {
fn network_full(self) -> Self;
}
impl TopologyBuilderExt for TopologyBuilder {
fn network_full(self) -> Self {
self.with_network_layout(Libp2pNetworkLayout::Full)
}
}
Key points:
- Maintain method chaining (return
&mut Self) - Validate inputs: clamp factors, enforce minimums
- Document assumptions (e.g., “requires at least 4 nodes”)
Adding a DSL Helper
To expose your custom workload through the high-level DSL, add a trait extension:
use async_trait::async_trait;
use testing_framework_core::scenario::{DynError, RunContext, ScenarioBuilder, Workload};
#[derive(Default)]
pub struct MyWorkloadBuilder {
target_rate: u64,
some_option: bool,
}
impl MyWorkloadBuilder {
pub const fn target_rate(mut self, target_rate: u64) -> Self {
self.target_rate = target_rate;
self
}
pub const fn some_option(mut self, some_option: bool) -> Self {
self.some_option = some_option;
self
}
pub const fn build(self) -> MyWorkload {
MyWorkload {
target_rate: self.target_rate,
some_option: self.some_option,
}
}
}
pub struct MyWorkload {
target_rate: u64,
some_option: bool,
}
#[async_trait]
impl Workload for MyWorkload {
fn name(&self) -> &str {
"my_workload"
}
async fn start(&self, _ctx: &RunContext) -> Result<(), DynError> {
Ok(())
}
}
pub trait MyWorkloadDsl {
fn my_workload_with(
self,
f: impl FnOnce(MyWorkloadBuilder) -> MyWorkloadBuilder,
) -> Self;
}
impl MyWorkloadDsl for ScenarioBuilder {
fn my_workload_with(
self,
f: impl FnOnce(MyWorkloadBuilder) -> MyWorkloadBuilder,
) -> Self {
let builder = f(MyWorkloadBuilder::default());
self.with_workload(builder.build())
}
}
Users can then call:
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(1))
.my_workload_with(|w| {
w.target_rate(10)
.some_option(true)
})
.build()
See Also
- API Levels: Builder DSL vs. Direct - Understanding the two API levels
- Custom Workload Example - Complete runnable example
- Internal Crate Reference - Where to add new code