Skip to main content

Composite Pipes

Process multiple data streams simultaneously across different DEX programs.
const pipeline = source.pipeComposite({
  orcaSwaps: solanaInstructionDecoder({
    range: { from: 200000000 },
    programId: orcaWhirlpool.programId,
    instructions: { swap: orcaWhirlpool.instructions.swap },
  }),
  raydiumSwaps: solanaInstructionDecoder({
    range: { from: 200000000 },
    programId: raydiumAmm.programId,
    instructions: { swapBaseIn: raydiumAmm.instructions.swapBaseIn },
  }),
});

for await (const { data } of pipeline) {
  console.log(`Orca Swaps: ${data.orcaSwaps.swap.length}`);
  console.log(`Raydium Swaps: ${data.raydiumSwaps.swapBaseIn.length}`);
}

Portal Caching

Cache Portal responses locally for faster iteration.
import { portalSqliteCache } from "@subsquid/pipes/portal-cache/node";

const source = solanaPortalSource({
  portal: "https://portal.sqd.dev/datasets/solana-mainnet",
  cache: portalSqliteCache({
    path: "./portal-cache.sqlite",
  }),
});

When to Use

  • Development iteration
  • Testing pipelines
  • Repeated processing of same slots

Custom Logging

The Pipes SDK uses a Pino-compatible logger, allowing you to integrate custom log transports and send logs to external services like GCP Cloud Logging, Sentry, or any other Pino-compatible destination.

Basic Custom Logger

Pass a custom logger to the source to configure logging for your entire pipeline.
import { createTarget } from "@subsquid/pipes";
import { solanaPortalSource } from "@subsquid/pipes/solana";
import pino from "pino";

const transport = pino.transport({
  target: "pino-pretty",
  options: {
    colorize: true,
    translateTime: "HH:MM:ss",
  },
});

const source = solanaPortalSource({
  portal: "https://portal.sqd.dev/datasets/solana-mainnet",
  logger: pino(transport),
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      logger.info({ count: data.length }, "Processed batch");
    }
  },
});

await source.pipeTo(target);

Integration with Cloud Services

You can use any Pino transport to send logs to cloud services. Pass the configured logger to the source.
import { createTarget } from "@subsquid/pipes";
import { solanaPortalSource } from "@subsquid/pipes/solana";
import pino from "pino";

const transport = pino.transport({
  target: "@google-cloud/logging-pino",
  options: {
    projectId: "your-project-id",
    logName: "solana-pipes-indexer",
  },
});

const source = solanaPortalSource({
  portal: "https://portal.sqd.dev/datasets/solana-mainnet",
  logger: pino(transport),
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      logger.info(
        {
          slotsProcessed: data.slots?.length,
          instructionsCount: data.swap?.length,
        },
        "Batch processed"
      );
    }
  },
});

await source.pipeTo(target);
The ctx.logger in transformers and targets is the same logger instance passed to the source. Configure logging at the source level, then use ctx.logger throughout your pipeline for consistent logging.

Custom Metrics

Track custom performance metrics.
const metrics = {
  slotsProcessed: 0,
  instructionsDecoded: 0,
  startTime: Date.now(),
};

const target = createTarget({
  write: async ({ read }) => {
    for await (const { data } of read()) {
      const span = profiler.start("processing");

      metrics.slotsProcessed += data.blocks?.length || 0;
      metrics.instructionsDecoded += data.swap?.length || 0;

      await processData(data);

      span.end();

      // Log metrics
      if (metrics.slotsProcessed % 100 === 0) {
        const elapsed = (Date.now() - metrics.startTime) / 1000;
        const sps = metrics.slotsProcessed / elapsed;
        console.log({
          slots: metrics.slotsProcessed,
          instructions: metrics.instructionsDecoded,
          slotsPerSec: sps.toFixed(2),
        });
      }
    }
  },
});

Profiler Usage

write: async ({ read }) => {
  for await (const { data } of read()) {
    const decodeSpan = profiler.start("decode");
    const decoded = decodeInstructions(data);
    decodeSpan.end();

    const saveSpan = profiler.start("save");
    await database.insert(decoded);
    saveSpan.end();
  }
};

Indexing Latency

Measure time from slot production to indexing.
const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      for (const swap of data.swap) {
        const slotTime = swap.timestamp.getTime();
        const now = Date.now();
        const latency = now - slotTime;

        logger.info({
          slot: swap.blockNumber,
          latencyMs: latency,
          latencySec: (latency / 1000).toFixed(2),
        });
      }
    }
  },
});

RPC Latency Monitoring

Monitor latency between Portal and external RPC providers.
import { solanaRpcLatencyWatcher } from '@subsquid/pipes/solana'

const stream = solanaPortalSource({
  portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
  query: { from: 'latest' },
}).pipe(
  solanaRpcLatencyWatcher({
    rpcUrl: ['https://api.mainnet-beta.solana.com'],
  }).pipe({
    profiler: { id: 'expose metrics' },
    transform: (data, { metrics }) => {
      if (!data) return

      for (const rpc of data.rpc) {
        metrics
          .gauge({
            name: 'rpc_latency_ms',
            help: 'RPC Latency in ms',
            labelNames: ['url'],
          })
          .set({ url: rpc.url }, data.rpc[0].portalDelayMs)
      }

      return data
    },
  }),
)

for await (const { data } of stream) {
  if (!data) continue
  console.log(`Slot: ${data.number} / ${data.timestamp.toString()}`)
  console.table(data.rpc)
}

Combining Patterns

// Portal caching + Composite pipes + Custom metrics
const source = solanaPortalSource({
  portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
  cache: {
    adapter: await portalSqliteCache({ path: './cache.sqlite' })
  }
})

const metrics = {
  orcaSwaps: 0,
  raydiumSwaps: 0
}

const pipeline = source.pipeComposite({
  orcaSwaps: solanaInstructionDecoder({
    profiler: { id: 'Orca' },
    range: { from: 200000000 },
    programId: orcaWhirlpool.programId,
    instructions: { swap: orcaWhirlpool.instructions.swap }
  }),
  raydiumSwaps: solanaInstructionDecoder({
    profiler: { id: 'Raydium' },
    range: { from: 200000000 },
    programId: raydiumAmm.programId,
    instructions: { swapBaseIn: raydiumAmm.instructions.swapBaseIn }
  })
})

const target = clickhouseTarget({
  client,
  onData: async ({store, data, ctx}) => {
    metrics.orcaSwaps += data.orcaSwaps.swap.length
    metrics.raydiumSwaps += data.raydiumSwaps.swapBaseIn.length

    ctx.logger.info(metrics)

    store.insert({
      table: 'orca_swaps',
      values: data.orcaSwaps.swap.map(s => ({...})),
      format: 'JSONEachRow'
    })

    store.insert({
      table: 'raydium_swaps',
      values: data.raydiumSwaps.swapBaseIn.map(s => ({...})),
      format: 'JSONEachRow'
    })
  },
  onRollback: async ({store, safeCursor}) => {
    await store.removeAllRows({
      tables: ['orca_swaps', 'raydium_swaps'],
      where: `slot_number > ${safeCursor.number}`
    })
  }
})

await pipeline.pipeTo(target)