# Client Implementations

To get you up and running quickly, the following sections show how to use subcriptions with common GraphQL client libraries.

# Apollo

To use Lighthouse subscriptions with the Apollo (opens new window) client library you will need to create an apollo-link

import { ApolloLink, Observable } from "apollo-link";

class PusherLink extends ApolloLink {
  constructor(options) {
    super();
    // Retain a handle to the Pusher client
    this.pusher = options.pusher;
  }

  request(operation, forward) {
    return new Observable((observer) => {
      // Check the result of the operation
      forward(operation).subscribe({
        next: (data) => {
          // If the operation has the subscription extension, it's a subscription
          const subscriptionChannel = this._getChannel(data, operation);

          if (subscriptionChannel) {
            this._createSubscription(subscriptionChannel, observer);
          } else {
            // No subscription found in the response, pipe data through
            observer.next(data);
            observer.complete();
          }
        },
      });
    });
  }

  _getChannel(data, operation) {
    return !!data.extensions &&
      !!data.extensions.lighthouse_subscriptions &&
      !!data.extensions.lighthouse_subscriptions.channels
      ? data.extensions.lighthouse_subscriptions.channels[
          operation.operationName
        ]
      : null;
  }

  _createSubscription(subscriptionChannel, observer) {
    const pusherChannel = this.pusher.subscribe(subscriptionChannel);
    // Subscribe for more update
    pusherChannel.bind("lighthouse-subscription", (payload) => {
      if (!payload.more) {
        // This is the end, the server says to unsubscribe
        this.pusher.unsubscribe(subscriptionChannel);
        observer.complete();
      }
      const result = payload.result;
      if (result) {
        // Send the new response to listeners
        observer.next(result);
      }
    });
  }
}

export default PusherLink;

Then initialize the pusher client and use it in the link stack.

const pusherLink = new PusherLink({
  pusher: new Pusher(PUSHER_API_KEY, {
    cluster: PUSHER_CLUSTER,
    authEndpoint: `${API_LOCATION}/graphql/subscriptions/auth`,
    auth: {
      headers: {
        authorization: BEARER_TOKEN,
      },
    },
  }),
});

const link = ApolloLink.from([pusherLink, httpLink(`${API_LOCATION}/graphql`)]);

# Relay Modern

To use Lighthouse's subscriptions with Relay Modern you will need to create a custom handler and inject it into Relay's environment.

import Pusher from "pusher-js";
import {
  Environment,
  Network,
  Observable,
  RecordSource,
  Store,
} from "relay-runtime";

const pusherClient = new Pusher(PUSHER_API_KEY, {
  cluster: "us2",
  authEndpoint: `${API_LOCATION}/graphql/subscriptions/auth`,
  auth: {
    headers: {
      authorization: BEARER_TOKEN,
    },
  },
});

const createHandler = (options) => {
  let channelName;
  const { pusher, fetchOperation } = options;

  return (operation, variables, cacheConfig) => {
    return Observable.create((sink) => {
      fetchOperation(operation, variables, cacheConfig)
        .then((response) => {
          return response.json();
        })
        .then((json) => {
          channelName =
            !!response.extensions &&
            !!response.extensions.lighthouse_subscriptions &&
            !!response.extensions.lighthouse_subscriptions.channels
              ? response.extensions.lighthouse_subscriptions.channels[
                  operation.name
                ]
              : null;

          if (!channelName) {
            return;
          }

          const channel = pusherClient.subscribe(channelName);

          channel.bind(`lighthouse-subscription`, (payload) => {
            const result = payload.result;

            if (result && result.errors) {
              sink.error(result.errors);
            } else if (result) {
              sink.next({
                data: result.data,
              });
            }

            if (!payload.more) {
              sink.complete();
            }
          });
        });
    }).finally(() => {
      pusherClient.unsubscribe(channelName);
    });
  };
};

const fetchOperation = (operation, variables, cacheConfig) => {
  const bodyValues = {
    variables,
    query: operation.text,
    operationName: operation.name,
  };

  return fetch(`${API_LOCATION}/graphql`, {
    method: "POST",
    opts: {
      credentials: "include",
    },
    headers: {
      Accept: "application/json",
      "Content-Type": "application/json",
      Authorization: BEARER_TOKEN,
    },
    body: JSON.stringify(bodyValues),
  });
};

const fetchQuery = (operation, variables, cacheConfig) => {
  return fetchOperation(operation, variables, cacheConfig).then((response) => {
    return response.json();
  });
};

const subscriptionHandler = createHandler({
  pusher: pusherClient,
  fetchOperation: fetchOperation,
});

const network = Network.create(fetchQuery, subscriptionHandler);

export const environment = new Environment({
  network,
  store: new Store(new RecordSource()),
});

# Flutter/Dart

To use Lighthouse's Pusher subscriptions with Flutter/Dart GQL libraries like Ferry (opens new window), you will need to create a custom link below:

import 'dart:async';
import 'dart:convert';

import 'package:dart_pusher_channels/dart_pusher_channels.dart';
import 'package:gql_exec/gql_exec.dart';
import 'package:gql_link/gql_link.dart';

typedef WsEventDecoder =
    FutureOr<Map<String, dynamic>?> Function(ChannelReadEvent event);

typedef ChannelNameGetter = String? Function(Response result);

typedef AuthTokenHeadersGetter =
    FutureOr<Map<String, String>> Function(String channelName);

/// A link that handles GraphQL subscriptions using Pusher Channels Protocol.
///
/// Example:
/// ```dart
/// final pusherChannelsLink = PusherChannelsLink(
///   wsHost: 'localhost',
///   wsPort: 8080,
///   appKey: 'your-app-key',
///   scheme: 'ws',
///   eventName: 'lighthouse-subscription',
///   getChannelName: (response) => response.response['extensions']?['lighthouse_subscriptions']?['channel'],
///   authUrl: 'http://localhost:8000/graphql/subscriptions/auth', // optional if you want to use a public channel
///   getAuthToken: (channelName) async { // optional if you want to use a public channel
///     final token = await secureStorage.read(key: 'token');
///
///     if (token == null) {
///       return null;
///     }
///
///     return 'Bearer $token';
///   },
/// );
///
/// final link = Link.from([
///   pusherChannelsLink,
///   HttpLink('http://localhost:8000/graphql'),
/// ]);
/// ```
///
/// Pub dependencies:
/// - [dart_pusher_channels](http://pub.dev/packages/dart_pusher_channels)
/// - [gql_exec](http://pub.dev/packages/gql_exec)
/// - [gql_link](http://pub.dev/packages/gql_link)
///
/// Supported servers:
///
/// - [graphql-ruby](https://graphql-ruby.org/javascript_client/graphiql_subscriptions#pusher) - Pusher & Ably broadcasters.
/// - [lighthouse-php](https://lighthouse-php.com/6/subscriptions/getting-started.html) - Pusher & Laravel Reverb broadcasters.
///
/// Notes:
///
/// Make sure to enable `NoCache` policy for the `subscription` type so you won't be getting old events
/// and this link should always be next to the link that produces the HTTP response.
///
/// And other servers that uses the Pusher Channels Protocol to broadcast GraphQL subscriptions.
class PusherChannelsLink extends Link {
  /// Creates a new [PusherChannelsLink] instance.
  PusherChannelsLink({
    required this.wsHost,
    required this.wsPort,
    required this.appKey,
    required this.scheme,
    required this.eventName,
    required this.getChannelName,
    this.cluster,
    this.authUrl,
    this.getAuthTokenHeaders,
    this.logger,
    this.parser = const ResponseParser(),
    this.wsEventDecoder = _defaultWsEventDecoder,
  }) {
    options = cluster != null
        ? PusherChannelsOptions.fromCluster(
            cluster: cluster!,
            key: appKey,
            host: wsHost,
            port: wsPort,
            scheme: scheme,
            shouldSupplyMetadataQueries: true,
            metadata: PusherChannelsOptionsMetadata.byDefault(),
          )
        : PusherChannelsOptions.fromHost(
            scheme: scheme,
            key: appKey,
            host: wsHost,
            port: wsPort,
            shouldSupplyMetadataQueries: true,
            metadata: PusherChannelsOptionsMetadata.byDefault(),
          );

    _client = PusherChannelsClient.websocket(
      options: options,
      connectionErrorHandler: (exception, stackTrace, refresh) {
        logger?.call('ws link connection error: $exception $stackTrace');

        Future.delayed(const Duration(seconds: 1), refresh);
      },
    );

    _connectionEstablishedSub = _client.onConnectionEstablished.listen((_) {
      logger?.call(
        'ws link connection established: ${_channels.map((e) => e.name).toList()}',
      );

      for (final channel in _channels) {
        channel.subscribeIfNotUnsubscribed();
      }
    });

    _client.connect();
  }

  /// The port of the Pusher server.
  final int wsPort;

  /// The host of the Pusher server.
  final String wsHost;

  /// The app key of the Pusher app.
  final String appKey;

  /// The scheme of the Pusher server (ws or wss).
  final String scheme;

  /// The event name of the Pusher Channels that we should listen to get the GraphQL subscription streams.
  final String eventName;

  /// The function that gets the channel name from the response to determine if we should start a GraphQL subscription.
  final ChannelNameGetter getChannelName;

  /// The cluster of the Pusher app.
  final String? cluster;

  /// The URL of the authentication endpoint that can be used to authenticate the Pusher connection.
  final String? authUrl;

  /// The function that gets the authentication token headers for the authorization URL.
  final AuthTokenHeadersGetter? getAuthTokenHeaders;

  /// The parser function that parses the Pusher Channels response.
  final ResponseParser parser;

  /// The event decoder function that decodes the Pusher Channels event data.
  final WsEventDecoder wsEventDecoder;

  /// The logger function.
  final void Function(String message)? logger;

  /// The list of channels that are subscribed to.
  final List<Channel> _channels = [];

  /// The Pusher client.
  late final PusherChannelsClient _client;

  /// The Pusher options.
  late final PusherChannelsOptions options;

  /// The subscription to the connection established event.
  late final StreamSubscription<void> _connectionEstablishedSub;

  static Map<String, dynamic>? _defaultWsEventDecoder(ChannelReadEvent event) {
    final data = event.data;

    if (data! is String) {
      return null;
    }

    return jsonDecode(data) as Map<String, dynamic>?;
  }

  
  Stream<Response> request(Request request, [NextLink? forward]) {
    final controller = StreamController<Response>();

    Channel? channel;

    forward?.call(request).listen((result) async {
      final channelName = getChannelName(result);

      if (channelName is! String) {
        controller.add(result);

        controller.close();

        return;
      }

      final tokenHeaders = authUrl != null
          ? await getAuthTokenHeaders?.call(channelName)
          : null;

      channel =
          (tokenHeaders != null
                  ? _client.privateChannel(
                      channelName,
                      forceCreateNewInstance: true,
                      authorizationDelegate:
                          EndpointAuthorizableChannelTokenAuthorizationDelegate.forPrivateChannel(
                            authorizationEndpoint: Uri.parse(authUrl!),
                            headers: tokenHeaders,
                          ),
                    )
                  : _client.publicChannel(
                      channelName,
                      forceCreateNewInstance: true,
                    ))
              as Channel;

      _channels.add(channel!);

      channel!.whenSubscriptionSucceeded().listen((event) {
        logger?.call('ws link subscription succeeded: ${event.data}');
      });

      channel!.onAuthenticationSubscriptionFailed().listen((event) {
        logger?.call('ws link subscription failed: ${event.data}');
      });

      channel!.bind(eventName).listen((event) async {
        final responseBody = await wsEventDecoder(event);

        if (responseBody == null) {
          return;
        }

        final hasMore = responseBody['more'] ?? true;

        final payload = responseBody['result'] ?? <String, dynamic>{};

        final wsResponse = parser.parseResponse(payload);

        if (wsResponse.data != null || wsResponse.errors != null) {
          controller.add(wsResponse);
        }

        if (!hasMore) {
          channel?.unsubscribe();

          await controller.close();

          logger?.call('ws subscription has no more data.');
        }
      }, onError: controller.addError);

      channel!.subscribe();
    }, onError: controller.addError);

    controller.onCancel = () async {
      channel?.unsubscribe();

      logger?.call('ws controller cancelled.');
    };

    return controller.stream;
  }

  
  Future<void> dispose() async {
    for (final channel in _channels) {
      channel.unsubscribe();
    }

    _channels.clear();

    await _connectionEstablishedSub.cancel();

    _client.dispose();

    super.dispose();
  }
}