Skip to content

Conversation

@IMax153
Copy link
Collaborator

@IMax153 IMax153 commented Dec 23, 2025

Allows streaming record batches via Arrow Flight SQL queries.

import { ArrowFlight, Auth } from "@edgeandnode/amp"
import * as ArrowFlightNode from "@edgeandnode/amp/ArrowFlight/Node"
import { NodeHttpClient, NodeKeyValueStore, NodeRuntime } from "@effect/platform-node"
import { Console, Effect, Layer, Schema, Stream } from "effect"

const program = Effect.gen(function*() {
  const flight = yield* ArrowFlight.ArrowFlight
  yield* flight.streamQuery(`SELECT * FROM "_/counter@dev".incremented LIMIT 5`, {
    // Optionally specify a schema to validate the shape of the record batch data
    schema: Schema.Struct({
      block_hash: Schema.String,
      tx_hash: Schema.String,
      address: Schema.String,
      block_num: Schema.String,
      timestamp: Schema.DateTimeUtc,
      count: Schema.String
    }).pipe(Schema.rename({
      block_hash: "blockHash",
      block_num: "blockNum",
      tx_hash: "txHash"
    }))
  }).pipe(
    Stream.runForEach(Console.log)
  )
})

/**
 * Setup the gRPC transport layer with an interceptor that will provide
 * authentication information
 */
const TransportLayer = ArrowFlightNode.layerTransportGrpc({
  baseUrl: `http://localhost:1602`
}).pipe(Layer.provide(ArrowFlight.layerInterceptorBearerAuth))

/**
 * Setup the Arrow Flight layer to be able to execute Arrow Flight SQL queries
 */
const ArrowFlightLayer = ArrowFlight.layer.pipe(
  Layer.provide(TransportLayer)
)

/**
 * Setup the platform-specific dependencies required for our application
 */
const PlatformLayer = Layer.mergeAll(
  NodeHttpClient.layerUndici,
  NodeKeyValueStore.layerFileSystem(".cache")
)

/**
 * Setup the layer which provides authentication services
 */
const AuthLayer = Auth.layer.pipe(
  Layer.provide(PlatformLayer)
)

/**
 * Provide the Auth layer to the Arrow Flight layer and merge the outputs so
 * we can use both in our program
 */
const MainLayer = ArrowFlightLayer.pipe(
  Layer.provideMerge(AuthLayer)
)

program.pipe(
  Effect.provide(MainLayer),
  NodeRuntime.runMain
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants