Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,29 @@ export class GraphQLSubscriptionsFixture {
await this.server.nextMessage;
}

async cleanup() {
async cleanup(): Promise<void> {
this.tearDownService();
WS.clean();
await this.server.closed;
}

private tearDownService(): void {
this.graphqlService['openSocket'] = () => Promise.resolve();

const socket = this.graphqlService['socket'];
if (socket) {
socket.onopen = null;
socket.onmessage = null;
socket.onerror = null;
socket.onclose = null;

if (typeof socket.close === 'function') {
socket.close();
}

this.graphqlService['socket'] = null;
}

this.graphqlService['clearPingMonitoring']();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { WebSocket } from 'mock-socket';

import { GraphQLSubscriptionsFixture } from '../__fixtures__/graphql-subscriptions-fixture';
import * as backoff from '../../../util/randomized-exponential-backoff/randomized-exponential-backoff';

jest.mock('isomorphic-ws', () => WebSocket);
jest.mock('../../../util/sleep-ms/sleep-ms', () => ({
sleepMs: () => new Promise((resolve) => setTimeout(resolve, 4)),
}));

// Mirrors the constants in graphql.service.ts (not exported).
const MAX_FAILED_HANDSHAKES = 5;
const BLOCKED_RETRY_INTERVAL_MS = 5 * 60 * 1000;

describe('GraphQL blocked-WebSocket back-off', () => {
let fixture: GraphQLSubscriptionsFixture;

beforeEach(() => {
fixture = new GraphQLSubscriptionsFixture();
});

afterEach(async () => {
await fixture.cleanup();
});

it('should back off to a slow interval after repeated handshakes fail before connecting', async () => {
const service = fixture.graphqlService as any;
const backoffSpy = jest.spyOn(
backoff,
'calculateRandomizedExponentialBackoffTime',
);
const infoSpy = jest.spyOn(service.logger, 'info');

const sub = fixture.triggerSubscription();

// Each cycle: connect, never ACK, close before the connection is established.
for (let i = 0; i < MAX_FAILED_HANDSHAKES; i++) {
await fixture.waitForConnection();
await fixture.consumeInitMessage();
await fixture.closeWithCode(1001);
fixture.openServer();
}

expect(service.consecutiveFailedHandshakes).toBeGreaterThanOrEqual(
MAX_FAILED_HANDSHAKES,
);
// Fast exponential backoff is used only for the first (MAX - 1) reconnects;
// once the cap is hit, the flat slow interval is used instead.
expect(backoffSpy).toHaveBeenCalledTimes(MAX_FAILED_HANDSHAKES - 1);
expect(infoSpy).toHaveBeenCalledWith(
`Reconnect socket in ${BLOCKED_RETRY_INTERVAL_MS}ms`,
);

sub.unsubscribe();
});

it('should not count a close that happens after a successful connection', async () => {
const service = fixture.graphqlService as any;

const sub = fixture.triggerSubscription();
await fixture.handleConnectionInit(); // connection_ack -> CONNECTED
await fixture.consumeSubscribeMessage();

expect(service.consecutiveFailedHandshakes).toBe(0);

// A drop after the connection was established is a normal reconnect.
await fixture.closeWithCode(1001);
expect(service.consecutiveFailedHandshakes).toBe(0);

fixture.openServer();
await fixture.handleConnectionInit();
sub.unsubscribe();
});

it('should reset the counter when the connection is established', async () => {
const service = fixture.graphqlService as any;

const sub = fixture.triggerSubscription();

for (let i = 0; i < 2; i++) {
await fixture.waitForConnection();
await fixture.getNextMessage();
await fixture.closeWithCode(1001);
fixture.openServer();
}
expect(service.consecutiveFailedHandshakes).toBe(2);

// A successful connection_ack resets the counter back to fast reconnects.
await fixture.handleConnectionInit();
await fixture.consumeSubscribeMessage();
expect(service.consecutiveFailedHandshakes).toBe(0);

sub.unsubscribe();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ const PING_PONG_INTERVAL_IN_MS = 20_000;
// https://www.w3.org/TR/websockets/#concept-websocket-close-fail
const CLIENT_SIDE_CLOSE_EVENT = 1000;

// Once the WebSocket has failed to establish this many times in a row, the reconnect
// loop backs off to a slow interval instead of minting a new temporary API key on
// every fast retry. A successful connection resets the counter.
const MAX_FAILED_HANDSHAKES = 5;
const BLOCKED_RETRY_INTERVAL_MS = 5 * 60 * 1000;

/**
* A service that lets the user query Qminder API via GraphQL statements.
* Queries and subscriptions are supported. There is no support for mutations.
Expand Down Expand Up @@ -192,6 +198,10 @@ export class GraphqlService {

private connectionAttemptsCount = 0;

// Consecutive WebSocket closes that happened before the connection was ever
// established. Reset to 0 on a successful GQL_CONNECTION_ACK.
private consecutiveFailedHandshakes = 0;

constructor() {
this.setServer('api.qminder.com');

Expand Down Expand Up @@ -446,14 +456,42 @@ export class GraphqlService {
reason: event.reason,
});

// Capture this before the status is overwritten below: a close while still
// CONNECTING means the connection never established.
const closedBeforeEstablished =
this.connectionStatus === ConnectionStatus.CONNECTING;

this.setConnectionStatus(ConnectionStatus.DISCONNECTED);
this.socket = null;
this.clearPingMonitoring();

if (this.shouldRetry(event)) {
const timer = calculateRandomizedExponentialBackoffTime(
this.connectionAttemptsCount,
if (closedBeforeEstablished) {
this.consecutiveFailedHandshakes++;
Comment thread
raluik marked this conversation as resolved.
this.logger.error(
`Received socket close event before a connection was established! ` +
`Close code: ${event.code} (consecutive failed handshakes: ${this.consecutiveFailedHandshakes})`,
);
}

if (this.shouldRetry(event)) {
// After repeated failures to establish, back off to a slow interval so we
// stop creating a temporary API key on every retry. Recovers on its own:
// a successful connection resets consecutiveFailedHandshakes.
const isBlocked =
this.consecutiveFailedHandshakes >= MAX_FAILED_HANDSHAKES;

if (isBlocked) {
this.logger.warn(
`Handshake failed ${this.consecutiveFailedHandshakes} times in a row; ` +
`slowing down reconnect attempts to ${BLOCKED_RETRY_INTERVAL_MS}ms.`,
);
}

const timer = isBlocked
? BLOCKED_RETRY_INTERVAL_MS
: calculateRandomizedExponentialBackoffTime(
this.connectionAttemptsCount,
);

this.logger.info(`Reconnect socket in ${timer.toFixed(0)}ms`);

Expand All @@ -466,12 +504,6 @@ export class GraphqlService {
this.logger.error('Failed to reconnect socket: ', error);
});
}

if (this.connectionStatus === ConnectionStatus.CONNECTING) {
this.logger.error(
`Received socket close event before a connection was established! Close code: ${event.code}`,
);
}
};

this.socket.onerror = () => {
Expand All @@ -498,6 +530,9 @@ export class GraphqlService {
this.retryableErroredSubscriptionsRetryCount = 0;
this.retryableErroredSubscriptionsAction$.next({ type: 'clear' });

// Connection established — clear the blocked back-off.
this.consecutiveFailedHandshakes = 0;

this.setConnectionStatus(ConnectionStatus.CONNECTED);
this.logger.info('Connected to websocket');
this.startConnectionMonitoring();
Expand Down
Loading