This tutorial is dedicated to Rust smart-contract (canister) development on Internet Computer (Dfinity) platform. Completing it, you’ll know how to use ic-event-hub library APIs in order to perform efficient cross-canister integrations.
Before digging into this tutorial it is recommended to learn the basics of smart-contract development for the Internet Computer. Here are some good starting points: official website, dedicated to canister development on the IC; IC developer forum, where one could find an answer to almost any technical question.
Complete code of this tutorial is here.
Motivation
Smart-contracts are not cheap. Running a simple "hello world" smart-contract is at least 10 times more expensive on the IC than running an similar "hello world" service on the AWS. And there is actually a very simple reason why this will always be like that - infrastructure costs.
When you run a "hello world" service on a single EC2 AWS replica you waste only a little of the resource (memory, processing power and bandwidth). All of this happens on a single replica (it doesn't matter if the replica is virtual or not).
When you run the same exact service on the IC, you're using the same amount of the resource, but (!) your program is processed by the whole subnet (7 replicas by now). Moreover it is not only processed simultaneously on all these replicas, but on top of that each message to your program goes through the consensus protocol (which under the hood hides much more additional replica interactions).
This means that it is just wrong to compare computation costs between the IC and AWS without taking additional infrastructure costs into an account. In order to make such a comparison, you have to imagine your program running not on a single EC2 AWS instance, but on seven EC2 instances with the help of some kind of consensus protocol (e.g. Kafka) instead. This way your AWS setup will (roughly) provide the same set of security guarantees and is eligible for comparison with the IC.
Smart-contract developers on the IC understand that "time is money" like nobody else. The more efficient code you write, the more money you'll save. Developers now are basically financially motivated to build faster programs, otherwise they just won't be able to sustain their services. Code optimizations and frequent code refactoring are now make much more sense from the business point of view - they cut the losses.
Efficient code -> less money lost -> more money in your pocket. Simple as that.
ic-event-hub
helps with cycles saving a lot, by providing a special event-batching functionality. Basically it lets your canister accumulate messages you want to send to other canisters and then send them all together in a single batch.
How does it help with saving cycles? Let's look at this table from the Dfinity's website. We're particularly interested in these two rows:
- Xnet Call - 260,000 cycles
- Xnet Byte Transmission - 1,000 cycles
It means that it is much cheaper to send one 1000 bytes message, then to send a thousand of 1 byte messages, because for each message sent to another canister there is a fee taken.
Sending one 1000 bytes message would cost you
(260,000 + (1,000 * 1,000)) * 1 = 1260000
cycles, while sending a thousand of 1 byte messages would cost you(260,000 + 1,000) * 1000 = 261000000
cycles, 200 times more expensive.
So, if you find yourself in a situation when your canister sends lots of small (under 100KB) messages to other canisters, it is better for you to find a tool that will optimize this process so you could save cycles. ic-event-hub
library is exactly that tool.
In this tutorial, we'll go through an example of such a situation and learn how to use ic-event-hub
in order to fix it.
Task definition
Imagine the following task: we want data supplied by users to be available on multiple canisters.
Naive solution is to make users to send the same piece of data to multiple canisters. This won't work, because there is no easy way to fight against malicious users.
It means that we're left with the only option - to make users send data to one canister and to make this canister to re-transmit this data to all other canisters. The only problem with this solution is that it is expensive - for each data piece sent by user we send N (where N is canisters count) more messages to propagate this piece of data to other canisters. Let's use ic-event-hub
to solve this problem.
Implementation
Let's start with the file system layout:
project/
canisters/
emitter/ // a canister that will accept user data
actor.rs
build.sh
can.did
cargo.toml
listener/ // a canister that will receive re-transmited user-data
actor.rs
build.sh
can.did
cargo.toml
e2e-test/ // typescript-based test suite to check our logic
src/
example.spec.ts
deploy.ts
dfx.json
package.json
tsconfig.json
As you can see, we have two canisters: an emitter and a listener, as well as a typescript project with tests. In this tutorial, instead of using terminal and dfx, we'll use typescript for deployment and testing.
Emitter canister
This canister will receive data from users and re-transmit it to the listener canister in form of events.
Dependencies
# project/emitter/cargo.toml
[package]
name = "emitter"
version = "0.1.0"
edition = "2018"
[lib]
crate-type = ["cdylib"]
path = "actor.rs"
[dependencies]
ic-cdk = "0.4.0"
ic-cdk-macros = "0.4.0"
candid = "0.7.12"
serde = "1.0.126"
ic-event-hub = "0.3.0"
ic-event-hub-macros = "0.3.0"
Buildscript
# project/emitter/build.sh
#!/usr/bin/env bash
# this part makes it possible to execute this script from another directory
SCRIPT=$(readlink -f "$0")
SCRIPTPATH=$(dirname "$SCRIPT")
cd "$SCRIPTPATH" || exit
cargo build --target wasm32-unknown-unknown --release --package emitter && \
ic-cdk-optimizer ./target/wasm32-unknown-unknown/release/emitter.wasm -o ./target/wasm32-unknown-unknown/release/emitter-opt.wasm
Canister logic
To make it more representative, let's make our canisters count how many pieces of data they process and how many inter-canister messages does it take.
Emitter canister is the one that receives data pieces and re-transmits them. One data piece - one request. Let's define its state and the init()
function:
// project/emitter/actor.rs
#[derive(Default)]
pub struct RequestCounter {
pub counter: u64,
}
static mut STATE: Option<RequestCounter> = None;
pub fn get_state() -> &'static mut RequestCounter {
unsafe { STATE.as_mut().unwrap() }
}
#[init]
fn init() {
unsafe {
STATE = Some(RequestCounter::default());
}
}
As you can see, it's pretty trivial. We only store requests count inside the state. Let's then define everything we need for ic-event-hub
in order to work:
// project/emitter/actor.rs
#[derive(Event)]
pub struct MirrorEvent {
pub data: Vec<u8>,
}
implement_event_emitter!(1_000_000_000 * 20, 500 * 1024);
We have a simple event MirrorEvent
that contains some abstract data supplied by a user - nothing interesting. But let's have a closer look at invoked macros.
implement_event_emitter
macro initializes ic-event-hub
's state (enabling us to call functions like emit()
and send_events()
) with some arguments which define event-batching behavior. The first one is the maximum batch forming time. The second one is the maximum batch size in bytes.
These arguments will work the following way:
- For each event listener there is a separate "forming batch" queue.
- Once an event is emitted, this queue is checked.
- If the size of this queue, after adding the new event to it, exceeds the max batch size (the second argument) this queue is transformed into a batch and is pushed into special "ready-to-send" queue. The new "forming batch" queue is created and the event is placed there instead.
- If the size of this queue, after adding the new event to it, is less than max batch size, then this new event is just gets added to the queue.
- Once each
heartbeat
each such "forming batch" queue is checked for it's creation timestamp. - If it was created more than maximum batch forming time ago, this queue is transformed into a batch and is pushed into "ready-to-send" queue.
Yes, it sounds complicated, but it is simple in practice. In our case we did set these arguments like:
- maximum batch forming time = 20 seconds;
- maximum batch size = 500 KB.
This means, that if our canister is under heavy load (emits hundreds of events per second), ic-event-hub
guarantees that each batch will never exceed 500 KB in size. This is a useful restriction, because message size is limited in the IC (currently 2MB per message).
On the other hand, if our canister is idle most of the time (emits only a couple of events per hour), ic-event-hub
guarantees that these events will reach their destination in no more than 20 seconds.
There are two more macro invocations we have to do here:
// project/emitter/actor.rs
implement_subscribe!();
implement_unsubscribe!();
These macros create two more update
functions in your canister: subscribe()
and unsubscribe()
.
You don't have to list them in your
.did
file.
Theseupdate
functions let listener canisters to subscribe to events. We'll see how it's done a little later.
By the way. You can supply a guard-function into these macros, likeimplement_subscribe!(guard = "guard_fn_name")
. This will enable you to deny any unauthorized subscription, if you want such a functionality.
Last, but not least, we want to enable event sending. ic-event-hub
is designed to sent events during heartbeat
s, so here is how we should do it:
// project/emitter/actor.rs
#[heartbeat]
pub fn tick() {
send_events();
}
All that's left to do is to write an update function that would receive users' data and emit it in form of events:
// project/emitter/actor.rs
#[update]
fn mirror(data: Vec<u8>) {
get_state().counter += 1;
emit(MirrorEvent { data });
}
#[query]
fn get_requests_count() -> u64 {
get_state().counter
}
As you might notice there is also a counter increment for each update
function call as well as a query
function that returns current counter's value.
That's it for the emitter canister. The complete project/emitter/actor.rs
file should look like this:
// project/emitter/actor.rs
// ------------- MAIN LOGIC -------------------
#[update]
fn mirror(data: Vec<u8>) {
get_state().counter += 1;
emit(MirrorEvent { data });
}
#[query]
fn get_requests_count() -> u64 {
get_state().counter
}
// ------------------ EVENT HUB ------------------
#[derive(Event)]
pub struct MirrorEvent {
pub data: Vec<u8>,
}
implement_event_emitter!(1_000_000_000 * 20, 500 * 1024);
implement_subscribe!();
implement_unsubscribe!();
#[heartbeat]
pub fn tick() {
send_events();
}
// ------------------ STATE ----------------------
#[derive(Default)]
pub struct RequestCounter {
pub counter: u64,
}
static mut STATE: Option<RequestCounter> = None;
pub fn get_state() -> &'static mut RequestCounter {
unsafe { STATE.as_mut().unwrap() }
}
#[init]
fn init() {
unsafe {
STATE = Some(RequestCounter::default());
}
}
Candid interface
// project/emitter/can.did
service : {
"mirror" : (blob) -> ();
"get_requests_count" : () -> (nat64) query;
}
Listener canister
This canister will receive re-transmitted data from the emitter canister in form of event batches.
Dependencies
# project/listener/cargo.toml
[package]
name = "listener"
version = "0.1.0"
edition = "2018"
[lib]
crate-type = ["cdylib"]
path = "actor.rs"
[dependencies]
ic-cdk = "0.4.0"
ic-cdk-macros = "0.4.0"
candid = "0.7.12"
serde = "1.0.126"
ic-event-hub = "0.3.0"
ic-event-hub-macros = "0.3.0"
Buildscript
# project/listener/build.sh
#!/usr/bin/env bash
SCRIPT=$(readlink -f "$0")
SCRIPTPATH=$(dirname "$SCRIPT")
cd "$SCRIPTPATH" || exit
cargo build --target wasm32-unknown-unknown --release --package listener && \
ic-cdk-optimizer ./target/wasm32-unknown-unknown/release/listener.wasm -o ./target/wasm32-unknown-unknown/release/listener-opt.wasm
Canister logic
Let's start with the state of this canister. It have to store the principal of the emitter canister and we want to count how many events and batches this canister would receive:
// project/listener/actor.rs
pub struct RequestCounterMirror {
pub emitter_canister_id: Principal,
pub events_received: u64,
pub batches_received: u64,
}
static mut STATE: Option<RequestCounterMirror> = None;
pub fn get_state() -> &'static mut RequestCounterMirror {
unsafe { STATE.as_mut().unwrap() }
}
#[init]
fn init(emitter_canister_id: Principal) {
unsafe {
STATE = Some(RequestCounterMirror {
emitter_canister_id,
events_received: 0,
batches_received: 0,
});
}
}
The code is pretty straightforward. Please notice that we pass the principal of the emitter canister into init()
function.
The listener canister needs to define the type of events it will receive (you can import it from a shared library or just redefine in place):
// project/listener/actor.rs
#[derive(Event, Debug)]
pub struct MirrorEvent {
pub data: Vec<u8>,
}
In order to start receiving events from the emitter canister, listener canister has to subscribe()
to them. This is done the following way:
// project/listener/actor.rs
#[update]
async fn start_listening() {
get_state()
.emitter_canister_id
.subscribe(SubscribeRequest {
callbacks: vec![CallbackInfo {
filter: EventFilter::empty(),
method_name: String::from("events_callback"),
}],
})
.await
.ok()
.unwrap();
}
In the perfect world we would make this
subscribe()
call from theinit()
function, but we can't. There is a workaround for this, but we won't use it to keep it simple here.We're using
IEventEmitter
trait here in order to make asubscribe()
call by simply dotting emitter canister's principal. Come take a look at this trait if you want to make your code cleaner, while calling remote canisters.
Basically, by making this call, the listener canister tells the emitter canister "Hey there! Would you kindly send all events you emit to the 'events_callback' method of mine?".
Let's define this events_callback
function. It is important to say that this function should follow some rules to work properly:
- it should be an update function;
- it should be named exactly like you've registered it sending a
subscribe()
request; - it should have only a single argument of type
Vec<Event>
; - it should have no return value (you can return something, but it will be ignored anyway). It is also important to note that this function will be executed once for each received event batch:
// project/listener/actor.rs
#[update]
fn events_callback(events: Vec<Event>) {
get_state().batches_received += 1;
for event in events {
if event.get_name().as_str() == "MirrorEvent" {
let ev: MirrorEvent = MirrorEvent::from_event(event);
print(format!("Got event: {:?}", ev).as_str());
get_state().events_received += 1;
}
}
}
This function is pretty simple. For each event in a batch, we're checking for an event's name and if it matches "MirrorEvent" (the name of our event structure), we're decoding a MirrorEvent
structure from the generic event that we've received and print this event.
During this function we're also updating the counters.
All that's left to do is to define query
functions to be able to read counter values:
// project/listener/actor.rs
#[query]
fn get_events_received() -> u64 {
get_state().events_received
}
#[query]
fn get_batches_received() -> u64 {
get_state().batches_received
}
The complete project/listener/actor.rs
file should look like this:
// project/listener/actor.rs
// ------------- MAIN LOGIC -------------------
#[query]
fn get_events_received() -> u64 {
get_state().events_received
}
#[query]
fn get_batches_received() -> u64 {
get_state().batches_received
}
// ----------------- EVENT HUB ----------------------
#[derive(Event, Debug)]
pub struct MirrorEvent {
pub data: Vec<u8>,
}
#[update]
async fn start_listening() {
get_state()
.emitter_canister_id
.subscribe(SubscribeRequest {
callbacks: vec![CallbackInfo {
filter: EventFilter::empty(),
method_name: String::from("events_callback"),
}],
})
.await
.ok()
.unwrap();
}
#[update]
fn events_callback(events: Vec<Event>) {
get_state().batches_received += 1;
for event in events {
if event.get_name().as_str() == "MirrorEvent" {
let ev: MirrorEvent = MirrorEvent::from_event(event);
print(format!("Got event: {:?}", ev).as_str());
get_state().events_received += 1;
}
}
}
// ------------------ STATE ----------------------
pub struct RequestCounterMirror {
pub emitter_canister_id: Principal,
pub events_received: u64,
pub batches_received: u64,
}
static mut STATE: Option<RequestCounterMirror> = None;
pub fn get_state() -> &'static mut RequestCounterMirror {
unsafe { STATE.as_mut().unwrap() }
}
#[init]
fn init(emitter_canister_id: Principal) {
unsafe {
STATE = Some(RequestCounterMirror {
emitter_canister_id,
events_received: 0,
batches_received: 0,
});
}
}
Candid interface
// project/listener/can.did
service : (principal) -> {
"start_listening" : () -> ();
"get_events_received" : () -> (nat64) query;
"get_batches_received" : () -> (nat64) query;
}
Tests
Now let's write a typescript test in order to check if everything works as expected.
Dependencies
// e2e-test/package.json
{
"name": "e2e-tests",
"version": "1.0.0",
"main": "index.js",
"license": "MIT",
"scripts": {
"start": "dfx start --clean",
"build": "dfx build --check",
"test": "ts-mocha --paths --timeout 1000000 src/**/*.spec.ts"
},
"devDependencies": {
"@dfinity/agent": "^0.9.2",
"@dfinity/candid": "^0.9.2",
"@dfinity/identity": "^0.9.2",
"@dfinity/principal": "^0.9.2",
"@types/chai": "^4.2.21",
"@types/mocha": "^9.0.0",
"@types/node": "^16.4.3",
"@types/node-fetch": "^2.5.12",
"chai": "^4.3.4",
"mocha": "^9.0.3",
"node-fetch": "^2.6.1",
"ts-mocha": "^8.0.0",
"ts-node": "^10.1.0",
"tsconfig-paths": "^3.10.1",
"typescript": "^4.3.5"
}
}
Typescript config
// e2e-test/tsconfig.json
{
"compilerOptions": {
"target": "ESNext",
"module": "commonjs",
"outDir": "./dist",
"strict": true,
"moduleResolution": "node",
"baseUrl": "./",
"paths": {
"dfx/*": [
".dfx/local/canisters/*.did.js"
],
"dfx-type/*": [
".dfx/local/canisters/*.did.d.ts"
]
},
"allowJs": true,
"allowSyntheticDefaultImports": true,
"esModuleInterop": true,
"preserveSymlinks": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true
}
}
Notice how the paths
section is arranged.
DFX config
// e2e-test/dfx.json
{
"canisters": {
"emitter": {
"build": "../canisters/emitter/build.sh",
"candid": "../canisters/emitter/can.did",
"wasm": "../canisters/emitter/target/wasm32-unknown-unknown/release/emitter-opt.wasm",
"type": "custom"
},
"listener": {
"build": "../canisters/listener/build.sh",
"candid": "../canisters/listener/can.did",
"wasm": "../canisters/listener/target/wasm32-unknown-unknown/release/listener-opt.wasm",
"type": "custom"
}
},
"dfx": "0.9.2",
"networks": {
"local": {
"bind": "127.0.0.1:8000",
"type": "ephemeral"
}
},
"version": 1
}
Canister deployment with typescript
So, we want to have a function that would deploy our canisters, instantiate actor objects with the correct IDLs and return them back for us to use. In order to implement this function, we first need to implement another one, that will create a new canister, install the provided wasm-module on that canister and return an actor object with the provided interface. Let's script that function:
// e2e-test/src/deploy.ts
export async function deployCanister<T>(name: string, arg: number[], agent: HttpAgent): Promise<{ actor: T, canisterId: Principal }> {
const managementCanister = getManagementCanister({agent});
const {canister_id} = await managementCanister.provisional_create_canister_with_cycles({amount: [], settings: []});
const wasm = fs.readFileSync(`.dfx/local/canisters/${name}/${name}.wasm`);
const {idlFactory} = await import(`dfx/${name}/${name}`)
await managementCanister.install_code({
canister_id,
mode: {[CanisterInstallMode.Install]: null},
wasm_module: [...wasm],
arg
});
console.log(`Canister ${name} ${canister_id} deployed`);
return {
actor: Actor.createActor(idlFactory, {
agent,
canisterId: canister_id
}),
canisterId: canister_id
};
}
Let's now define a data type we want to receive back, when deployment is over. We want to receive back at least a HttpAgent
and an of actor object for each of both: the emitter canister and the listener canister:
// e2e-test/src/deploy.ts
import {_SERVICE as IEmitterService} from 'dfx-type/emitter/emitter';
import {_SERVICE as IListenerService} from 'dfx-type/listener/listener';
export interface ISetup {
agent: HttpAgent;
emitterService: IEmitterService;
listenerService: IListenerService;
}
Notice how clean our imports look, thanks to
tsconfig-paths
package.
Now we can finally write that function that will do all the work for us:
// e2e-test/src/deploy.ts
import fetch from "node-fetch";
export async function setup(identity: Identity): Promise<ISetup> {
const agent = new HttpAgent({
host: 'http://localhost:8000/',
// @ts-ignore
fetch,
identity,
});
await agent.fetchRootKey();
const {
actor: emitterService,
canisterId: emitterCanisterId
} = await deployCanister<IEmitterService>('emitter', [], agent);
const {
actor: listenerService
} = await deployCanister<IListenerService>('listener', [...IDL.encode([IDL.Principal], [emitterCanisterId])], agent);
return {
agent,
emitterService,
listenerService,
};
}
Now we can use this function inside tests any possible way we like. One could imagine calling this function from beforeEach
callback, to have a clean canister state before each test.
Testing
We're almost there. Now we only left to implement the test-case itself:
// e2e-test/src/example.spec.ts
describe('event batching', () => {
let s: ISetup;
before(async () => {
s = await setup(Ed25519KeyIdentity.generate());
});
it("flow works fine", async () => {
// this listener should catch all events
await s.listenerService.start_listening();
// checking before
const emitterRequestsBefore = await s.emitterService.get_requests_count();
assert.equal(emitterRequestsBefore, 0n, "Emitter state should be clean before everything");
const listenerEventsBefore = await s.listenerService.get_events_received();
const listenerBatchesBefore = await s.listenerService.get_batches_received();
assert.equal(listenerEventsBefore, 0n, "Listener events state should be clean before everything");
assert.equal(listenerBatchesBefore, 0n, "Listener batches state should be clean before everything");
// sending 10 events each of 100 bytes of data
for (let i = 0; i < 10; i++) {
await s.emitterService.mirror(Array(100).fill(1));
}
// it should send all events in one batch
const emitterRequestsAfter = await s.emitterService.get_requests_count();
assert.equal(emitterRequestsAfter, 10n, "Emitter requests count should equal 10");
// waiting for at least 10 seconds
await delay(10_000);
const listenerEventsAfter = await s.listenerService.get_events_received();
assert.equal(listenerEventsAfter, 10n, "Listener events count should be equal to 10");
const listenerBatchesAfter = await s.listenerService.get_batches_received();
assert.equal(listenerBatchesAfter, 1n, "Listener batches count should be equal to 1");
});
});
The test-case is pretty simple. First we check if the state of both canisters is clean. Then we call emitter.mirror()
method 10 times. Then we wait for some time and check if:
- the emitter received exactly 10 requests;
- the listener received exactly 10 events in a single batch.
By the way, delay()
function, that just asynchronously waits for some provided amount of time, is defined like this:
function delay(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}
Trying it out
Now let's run our test-case. Start a local IC replica:
yarn start
Then (in a separate terminal window) build the project:
yarn build
Then run tests:
yarn test
This last command should deploy a new pair of canisters and run our test-case against them.
Afterword
In this tutorial we used ic-event-hub
library in order to set an efficient communication channel between two canisters. It's important to mention, that this pattern could be easily applied to more complex scenarios with tens (or even hundreds) of interacting canisters.
Complete code of this tutorial can be found here.
Thanks for reading!
Top comments (0)