Cursor Management
Save progress to resume from last processed slot. Use case: Resume indexing after restart without re-processing data.Copy
import { createTarget } from "@subsquid/pipes";
import {
solanaPortalSource,
solanaInstructionDecoder,
} from "@subsquid/pipes/solana";
import * as orcaWhirlpool from "./abi/orca_whirlpool/index.js";
import fs from "fs/promises";
const CURSOR_FILE = "cursor.json";
async function loadCursor() {
try {
const data = await fs.readFile(CURSOR_FILE, "utf-8");
return JSON.parse(data);
} catch {
return null;
}
}
async function saveCursor(slotNumber: number) {
await fs.writeFile(CURSOR_FILE, JSON.stringify({ slotNumber }));
}
// Load saved cursor
const cursor = await loadCursor();
const source = solanaPortalSource({
portal: "https://portal.sqd.dev/datasets/solana-mainnet",
cursor: cursor ? { number: cursor.slotNumber } : undefined,
});
const decoder = solanaInstructionDecoder({
range: { from: cursor?.slotNumber || 200000000 },
programId: orcaWhirlpool.programId,
instructions: { swap: orcaWhirlpool.instructions.swap },
});
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
// Process data
await database.insert(data.swap);
// Save cursor
const lastSlot = Math.max(...data.swap.map((s) => s.blockNumber));
await saveCursor(lastSlot);
logger.info(`Processed up to slot ${lastSlot}`);
}
},
});
await source.pipe(decoder).pipeTo(target);
Fork Handling
Handle Solana reorganizations. Use case: Automatically handle chain forks and rollbacks.Copy
import { createTarget } from "@subsquid/pipes";
import {
solanaPortalSource,
solanaInstructionDecoder,
} from "@subsquid/pipes/solana";
import * as orcaWhirlpool from "./abi/orca_whirlpool/index.js";
const source = solanaPortalSource({
portal: "https://portal.sqd.dev/datasets/solana-mainnet",
});
const decoder = solanaInstructionDecoder({
range: { from: "latest" },
programId: orcaWhirlpool.programId,
instructions: { swap: orcaWhirlpool.instructions.swap },
});
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
// Insert swaps
await database.query(
"INSERT INTO swaps (slot_number, transaction_hash, program_id) VALUES ($1, $2, $3)",
data.swap.map((s) => [
s.blockNumber,
s.transaction.signatures[0],
s.programId,
])
);
logger.info(`Inserted ${data.swap.length} swaps`);
}
},
onRollback: async ({ cursor }) => {
logger.warn(`Fork detected at slot ${cursor.number}`);
// Delete orphaned data
await database.query("DELETE FROM swaps WHERE slot_number > $1", [
cursor.number,
]);
logger.info(`Rolled back to slot ${cursor.number}`);
},
});
await source.pipe(decoder).pipeTo(target);
ClickHouse Target
Persist to ClickHouse with automatic fork handling. Use case: Production-ready persistence with ClickHouse.Copy
import { createClient } from "@clickhouse/client";
import {
solanaPortalSource,
solanaInstructionDecoder,
} from "@subsquid/pipes/solana";
import { clickhouseTarget } from "@subsquid/pipes/targets/clickhouse";
import * as orcaWhirlpool from "./abi/orca_whirlpool/index.js";
// Create client
const client = createClient({
username: "default",
password: "default",
url: "http://localhost:8123",
});
// Create table
await client.command({
query: `
CREATE TABLE IF NOT EXISTS orca_swaps (
slot_number UInt64 CODEC (DoubleDelta, ZSTD),
timestamp DateTime CODEC (DoubleDelta, ZSTD),
transaction_hash String,
instruction_address UInt16,
program_id String,
sign Int8 DEFAULT 1
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY (slot_number, transaction_hash, instruction_address)
`,
});
const source = solanaPortalSource({
portal: "https://portal.sqd.dev/datasets/solana-mainnet",
});
const decoder = solanaInstructionDecoder({
range: { from: "latest" },
programId: orcaWhirlpool.programId,
instructions: { swap: orcaWhirlpool.instructions.swap },
});
const target = clickhouseTarget({
client,
onData: async ({ store, data, ctx }) => {
ctx.logger.info(`Inserting ${data.swap.length} swaps`);
store.insert({
table: "orca_swaps",
values: data.swap.map((s) => ({
slot_number: s.blockNumber,
timestamp: s.timestamp.valueOf() / 1000,
transaction_hash: s.transaction.signatures[0],
instruction_address: s.rawInstruction.instructionAddress[0],
program_id: s.programId,
})),
format: "JSONEachRow",
});
},
onRollback: async ({ store, safeCursor }) => {
await store.removeAllRows({
tables: ["orca_swaps"],
where: `slot_number > ${safeCursor.number}`,
});
},
});
await source.pipe(decoder).pipeTo(target);
File Output
Save data to JSON files. Use case: Simple file-based persistence for development.Copy
import { createTarget } from "@subsquid/pipes";
import {
solanaPortalSource,
solanaInstructionDecoder,
} from "@subsquid/pipes/solana";
import * as orcaWhirlpool from "./abi/orca_whirlpool/index.js";
import fs from "fs/promises";
const source = solanaPortalSource({
portal: "https://portal.sqd.dev/datasets/solana-mainnet",
});
const decoder = solanaInstructionDecoder({
range: { from: 200000000, to: 200001000 },
programId: orcaWhirlpool.programId,
instructions: { swap: orcaWhirlpool.instructions.swap },
});
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
const filename = `swaps-${Date.now()}.json`;
const simplified = data.swap.map((s) => ({
slot: s.blockNumber,
txHash: s.transaction.signatures[0],
programId: s.programId,
}));
await fs.writeFile(filename, JSON.stringify(simplified, null, 2));
logger.info(`Saved ${data.swap.length} swaps to ${filename}`);
}
},
});
await source.pipe(decoder).pipeTo(target);

