Watermark Persistence
Description
Section titled “Description”This example demonstrates file-backed watermark persistence across polls.
- Implement get/set callbacks for file-based watermark storage
- Verify watermark advances after each poll
- Demonstrate at-least-once delivery semantics
Prerequisites
Section titled “Prerequisites”- LocalNet running (via
algokit localnet start)
Run This Example
Section titled “Run This Example”From the repository’s examples/subscriber directory:
cd examples/subscribernpx tsx 11-watermark-persistence.ts/** * Example: Watermark Persistence * * This example demonstrates file-backed watermark persistence across polls. * - Implement get/set callbacks for file-based watermark storage * - Verify watermark advances after each poll * - Demonstrate at-least-once delivery semantics * * Prerequisites: * - LocalNet running (via `algokit localnet start`) */import fs from 'node:fs';import os from 'node:os';import path from 'node:path';import { algo, AlgorandClient } from '@algorandfoundation/algokit-utils';import { AlgorandSubscriber } from '@algorandfoundation/algokit-subscriber';import { printHeader, printStep, printInfo, printSuccess, printError, shortenAddress, formatAlgo,} from './shared/utils.js';
async function main() { printHeader('11 — Watermark Persistence');
// Step 1: Connect to LocalNet printStep(1, 'Connect to LocalNet'); const algorand = AlgorandClient.defaultLocalNet(); const status = await algorand.client.algod.status(); printInfo(`Current round: ${status.lastRound.toString()}`); printSuccess('Connected to LocalNet');
// Step 2: Create and fund accounts printStep(2, 'Create and fund accounts'); const sender = await algorand.account.fromEnvironment('WM_SENDER', algo(100)); const receiver = await algorand.account.fromEnvironment('WM_RECEIVER', algo(10)); const senderAddr = sender.addr.toString(); const receiverAddr = receiver.addr.toString(); printInfo(`Sender: ${shortenAddress(senderAddr)}`); printInfo(`Receiver: ${shortenAddress(receiverAddr)}`); printSuccess('Accounts created and funded');
// Step 3: Set up file-backed watermark persistence printStep(3, 'Set up file-backed watermark persistence'); const watermarkFile = path.join(os.tmpdir(), 'example-watermark.txt');
// Clean up any leftover file from a previous run if (fs.existsSync(watermarkFile)) { fs.unlinkSync(watermarkFile); }
const watermarkPersistence = { get: async (): Promise<bigint> => { if (fs.existsSync(watermarkFile)) { const content = fs.readFileSync(watermarkFile, 'utf-8').trim(); return BigInt(content); } return 0n; }, set: async (newWatermark: bigint): Promise<void> => { fs.writeFileSync(watermarkFile, newWatermark.toString(), 'utf-8'); }, };
printInfo(`Watermark file: ${watermarkFile}`); printInfo(`Initial watermark: ${(await watermarkPersistence.get()).toString()}`); printSuccess('File-backed watermark persistence configured');
// Step 4: Send first batch of 2 transactions printStep(4, 'Send first batch of 2 payments'); let firstRound: bigint | undefined; for (const note of ['batch1-txn1', 'batch1-txn2']) { const result = await algorand.send.payment({ sender: sender.addr, receiver: receiver.addr, amount: algo(1), note, }); const round = result.confirmation.confirmedRound!; if (!firstRound) firstRound = round; printInfo(`Sent ${note}: round ${round}`); } printSuccess('First batch of 2 payments sent');
// Step 5: Set initial watermark so we only scan from our first transaction printStep(5, 'Set initial watermark to isolate test transactions'); const startWatermark = firstRound! - 1n; await watermarkPersistence.set(startWatermark); printInfo(`Watermark set to: ${startWatermark.toString()}`); printSuccess('Watermark positioned before first batch');
// Step 6: First poll — should catch exactly 2 transactions printStep(6, 'First poll — expect 2 transactions from batch 1');
function createSubscriber() { return new AlgorandSubscriber( { filters: [ { name: 'payments', filter: { sender: senderAddr, receiver: receiverAddr, }, }, ], syncBehaviour: 'sync-oldest', maxRoundsToSync: 100, watermarkPersistence, }, algorand.client.algod, ); }
const subscriber1 = createSubscriber(); const result1 = await subscriber1.pollOnce(); const poll1Txns = result1.subscribedTransactions;
printInfo(`Transactions matched: ${poll1Txns.length.toString()}`); for (const txn of poll1Txns) { const note = txn.note ? Buffer.from(txn.note).toString('utf-8') : ''; printInfo(` ${note}: id: ${txn.id.slice(0, 12)}...`); }
if (poll1Txns.length !== 2) { throw new Error(`Expected 2 transactions in first poll, got ${poll1Txns.length}`); } printSuccess('First poll caught exactly 2 transactions');
// Step 7: Verify watermark was saved to file printStep(7, 'Verify watermark persisted to file'); const savedWatermark = await watermarkPersistence.get(); const fileContent = fs.readFileSync(watermarkFile, 'utf-8').trim(); printInfo(`File content: ${fileContent}`); printInfo(`Watermark value: ${savedWatermark.toString()}`);
if (savedWatermark <= startWatermark) { throw new Error( `Watermark should have advanced past ${startWatermark}, but is ${savedWatermark}`, ); } printSuccess(`Watermark advanced: ${startWatermark} -> ${savedWatermark}`);
// Step 8: Send second batch of 2 transactions printStep(8, 'Send second batch of 2 payments'); for (const note of ['batch2-txn1', 'batch2-txn2']) { const result = await algorand.send.payment({ sender: sender.addr, receiver: receiver.addr, amount: algo(2), note, }); printInfo(`Sent ${note}: round ${result.confirmation.confirmedRound}`); } printSuccess('Second batch of 2 payments sent');
// Step 9: Second poll — should catch ONLY the 2 new transactions (not the old ones) printStep(9, 'Second poll — expect only 2 NEW transactions from batch 2');
// Create a fresh subscriber instance — it reads watermark from file, // proving persistence works across subscriber instances const subscriber2 = createSubscriber(); const result2 = await subscriber2.pollOnce(); const poll2Txns = result2.subscribedTransactions;
printInfo(`Transactions matched: ${poll2Txns.length.toString()}`); for (const txn of poll2Txns) { const note = txn.note ? Buffer.from(txn.note).toString('utf-8') : ''; printInfo(` ${note}: id: ${txn.id.slice(0, 12)}...`); }
if (poll2Txns.length !== 2) { throw new Error(`Expected 2 transactions in second poll, got ${poll2Txns.length}`); }
// Verify these are batch2 transactions, not batch1 const poll2Notes = poll2Txns.map(txn => txn.note ? Buffer.from(txn.note).toString('utf-8') : '', ); const allBatch2 = poll2Notes.every(note => note.startsWith('batch2')); if (!allBatch2) { throw new Error(`Expected only batch2 transactions, got: ${poll2Notes.join(', ')}`); } printSuccess('Second poll caught exactly 2 NEW transactions (batch2 only)');
// Step 10: Verify final watermark advanced again printStep(10, 'Verify final watermark'); const finalWatermark = await watermarkPersistence.get(); printInfo(`Final watermark: ${finalWatermark.toString()}`);
if (finalWatermark <= savedWatermark) { throw new Error( `Final watermark should have advanced past ${savedWatermark}, but is ${finalWatermark}`, ); } printSuccess(`Watermark advanced: ${savedWatermark} -> ${finalWatermark}`);
// Step 11: Explain at-least-once delivery semantics printStep(11, 'At-least-once delivery semantics'); console.log(); console.log(' ┌─────────────────────────────────────────────────────────────┐'); console.log(' │ Watermark Persistence & Delivery Semantics │'); console.log(' ├─────────────────────────────────────────────────────────────┤'); console.log(' │ │'); console.log(' │ The watermark is updated AFTER processing completes: │'); console.log(' │ │'); console.log(' │ 1. get() -> read current watermark │'); console.log(' │ 2. Fetch transactions from watermark to tip │'); console.log(' │ 3. Fire on/onBatch handlers │'); console.log(' │ 4. set(newWatermark) -> persist new watermark │'); console.log(' │ │'); console.log(' │ If the process crashes between steps 3 and 4, the │'); console.log(' │ watermark is NOT updated. On restart, the same │'); console.log(' │ transactions will be re-fetched and re-processed. │'); console.log(' │ │'); console.log(' │ This gives AT-LEAST-ONCE delivery: │'); console.log(' │ - Every transaction is guaranteed to be processed │'); console.log(' │ - Some transactions MAY be processed more than once │'); console.log(' │ - Handlers should be idempotent (safe to re-run) │'); console.log(' │ │'); console.log(' │ To achieve exactly-once semantics, persist the watermark │'); console.log(' │ in the same atomic transaction as your business logic │'); console.log(' │ (e.g., in a database transaction). │'); console.log(' │ │'); console.log(' └─────────────────────────────────────────────────────────────┘'); console.log();
// Step 12: Clean up temp file printStep(12, 'Clean up temp file'); fs.unlinkSync(watermarkFile); printInfo(`Deleted: ${watermarkFile}`); printSuccess('Temp file cleaned up');
printHeader('Example complete');}
main().catch(err => { printError(err.message); process.exit(1);});Other examples
Section titled “Other examples”- Basic Poll Once
- Continuous Subscriber
- Payment Filters
- Asset Transfer Subscription
- App Call Subscription
- Multiple Named Filters
- Balance Change Tracking
- ARC-28 Event Subscription
- Inner Transaction Subscription
- Batch Handling & Data Mappers
- Watermark Persistence
- Sync Behaviours
- Custom Filters
- Stateless Subscriptions
- Lifecycle Hooks & Error Handling