Copy
import { solanaInstructionDecoder, solanaPortalSource } from '@subsquid/pipes/solana'
import * as meteoraDamm from './abi/meteora-damm/index.js'
import * as meteoraDlmm from './abi/meteora-dlmm/index.js'
import * as orcaWhirlpool from './abi/orca_whirlpool/index.js'
import * as raydiumAmm from './abi/raydium-amm/index.js'
import * as raydiumClmm from './abi/raydium-clmm/index.js'
import * as raydiumCpmm from './abi/raydium-cpmm/index.js'
const from = 340_000_000
async function main() {
const stream = solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
}).pipeComposite({
orcaWhirlpool: solanaInstructionDecoder({
range: { from },
programId: orcaWhirlpool.programId,
instructions: {
swap: orcaWhirlpool.instructions.swap,
swapV2: orcaWhirlpool.instructions.swapV2,
},
}),
meteoraDamm: solanaInstructionDecoder({
range: { from },
programId: meteoraDamm.programId,
instructions: {
swap: meteoraDamm.instructions.swap,
},
}),
meteoraDlmm: solanaInstructionDecoder({
range: { from },
programId: meteoraDlmm.programId,
instructions: {
swaps: meteoraDlmm.instructions.swap,
swapExactOut: meteoraDlmm.instructions.swapExactOut,
swapWithPriceImpact: meteoraDlmm.instructions.swapWithPriceImpact,
},
}),
raydiumAmm: solanaInstructionDecoder({
range: { from },
programId: raydiumAmm.programId,
instructions: {
swapBaseIn: raydiumAmm.instructions.swapBaseIn,
swapBaseOut: raydiumAmm.instructions.swapBaseOut,
},
}),
raydiumClmm: solanaInstructionDecoder({
range: { from },
programId: raydiumClmm.programId,
instructions: {
swap: raydiumClmm.instructions.swap,
swapV2: raydiumClmm.instructions.swapV2,
},
}),
raydiumCpmm: solanaInstructionDecoder({
range: { from },
programId: raydiumCpmm.programId,
instructions: {
swapBaseInput: raydiumCpmm.instructions.swapBaseInput,
swapBaseOutput: raydiumCpmm.instructions.swapBaseOutput,
},
}),
})
for await (const { data } of stream) {
console.log(`Orca swaps: ${data.orcaWhirlpool.swap.length + data.orcaWhirlpool.swapV2.length}`)
console.log(`Raydium AMM swaps: ${data.raydiumAmm.swapBaseIn.length + data.raydiumAmm.swapBaseOut.length}`)
console.log(`Meteora DAMM swaps: ${data.meteoraDamm.swap.length}`)
console.log(`Meteora DLMM swaps: ${data.meteoraDlmm.swaps.length + data.meteoraDlmm.swapExactOut.length}`)
console.log(`Raydium CLMM swaps: ${data.raydiumClmm.swap.length + data.raydiumClmm.swapV2.length}`)
console.log(`Raydium CPMM swaps: ${data.raydiumCpmm.swapBaseInput.length + data.raydiumCpmm.swapBaseOutput.length}`)
}
}
void main()
Processing Swaps
Extract swap details and persist to database.Copy
import { createTarget } from '@subsquid/pipes'
import { solanaInstructionDecoder, solanaPortalSource } from '@subsquid/pipes/solana'
import * as orcaWhirlpool from './abi/orca_whirlpool/index.js'
const source = solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
})
const decoder = solanaInstructionDecoder({
range: { from: 340_000_000 },
programId: orcaWhirlpool.programId,
instructions: {
swap: orcaWhirlpool.instructions.swap,
swapV2: orcaWhirlpool.instructions.swapV2,
},
})
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
const allSwaps = [...data.swap, ...data.swapV2]
for (const swap of allSwaps) {
// Access decoded instruction data
const instructionData = swap.instruction
// Access transaction details
const txHash = swap.transaction.signatures[0]
const slot = swap.blockNumber
// Access token balances if available
const tokenBalances = swap.tokenBalances
logger.info({
slot,
txHash,
programId: swap.programId,
instructionType: swap.rawInstruction.data.slice(0, 8),
})
// Persist to database
await database.insert({
slot,
txHash,
programId: swap.programId,
timestamp: swap.timestamp,
instructionData: instructionData,
})
}
}
},
})
await source.pipe(decoder).pipeTo(target)

