Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions dev-packages/cloudflare-integration-tests/suites/queue/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type { MessageBatch, Queue } from '@cloudflare/workers-types';
import * as Sentry from '@sentry/cloudflare';

interface Env {
SENTRY_DSN: string;
MY_QUEUE: Queue<{ trigger?: 'error'; payload?: string }>;
}

export default Sentry.withSentry(
(env: Env) => ({
dsn: env.SENTRY_DSN,
tracesSampleRate: 1,
}),
{
async fetch(request, env) {
const url = new URL(request.url);

if (url.pathname === '/enqueue/error') {
await env.MY_QUEUE.send({ trigger: 'error' });
return new Response('enqueued error');
}

if (url.pathname === '/enqueue/ok') {
await env.MY_QUEUE.send({ payload: 'hello' });
return new Response('enqueued ok');
}

if (url.pathname === '/enqueue/batch') {
await env.MY_QUEUE.sendBatch([
{ body: { payload: 'one' } },
{ body: { payload: 'two' } },
{ body: { payload: 'three' } },
]);
return new Response('enqueued batch');
}

return new Response('not found', { status: 404 });
},
async queue(batch: MessageBatch<{ trigger?: 'error'; payload?: string }>) {
for (const message of batch.messages) {
if (message.body.trigger === 'error') {
throw new Error('Boom from queue handler');
}
}
},
} as ExportedHandler<Env>,
);
128 changes: 128 additions & 0 deletions dev-packages/cloudflare-integration-tests/suites/queue/test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import type { Envelope } from '@sentry/core';
import { expect, it } from 'vitest';
import { createRunner } from '../../runner';

function envelopeItemType(envelope: Envelope): string | undefined {
return envelope[1][0]?.[0]?.type as string | undefined;
}

function envelopeItem(envelope: Envelope): Record<string, unknown> {
return envelope[1][0]![1] as Record<string, unknown>;
}

function findPublishSpan(envelope: Envelope): Record<string, unknown> | undefined {
if (envelopeItemType(envelope) !== 'transaction') return undefined;
const tx = envelopeItem(envelope);
const spans = (tx.spans as Array<Record<string, unknown>>) || [];
return spans.find(s => (s.op as string) === 'queue.publish');
}

function isConsumerTransaction(envelope: Envelope): boolean {
if (envelopeItemType(envelope) !== 'transaction') return false;
const tx = envelopeItem(envelope);
return tx.transaction === 'process test-queue';
}

it('captures errors thrown by the queue handler with the correct mechanism', async ({ signal }) => {
const runner = createRunner(__dirname)
.ignore('transaction')
.expect((envelope: Envelope) => {
expect(envelopeItemType(envelope)).toBe('event');
const event = envelopeItem(envelope);
expect(event).toMatchObject({
level: 'error',
exception: {
values: [
{
type: 'Error',
value: 'Boom from queue handler',
mechanism: { type: 'auto.faas.cloudflare.queue', handled: false },
},
],
},
});
})
.start(signal);

await runner.makeRequest('post', '/enqueue/error');
await runner.completed();
});

it('emits a queue.publish span on env.MY_QUEUE.send and a queue.process transaction on the consumer', async ({
signal,
}) => {
const runner = createRunner(__dirname)
.unordered()
.expect((envelope: Envelope) => {
// Producer transaction must contain a queue.publish child span
const publishSpan = findPublishSpan(envelope);
expect(publishSpan).toBeDefined();
expect(publishSpan).toMatchObject({
op: 'queue.publish',
description: 'send MY_QUEUE',
data: expect.objectContaining({
'messaging.system': 'cloudflare',
'messaging.destination.name': 'MY_QUEUE',
'messaging.operation.type': 'send',
'messaging.operation.name': 'send',
'sentry.origin': 'auto.faas.cloudflare.queue',
}),
});
})
.expect((envelope: Envelope) => {
expect(isConsumerTransaction(envelope)).toBe(true);
const tx = envelopeItem(envelope);
const trace = (tx.contexts as Record<string, Record<string, unknown>>).trace as Record<string, unknown>;
expect(trace).toMatchObject({
op: 'queue.process',
origin: 'auto.faas.cloudflare.queue',
data: expect.objectContaining({
'messaging.system': 'cloudflare',
'messaging.destination.name': 'test-queue',
'messaging.operation.type': 'process',
'messaging.operation.name': 'process',
'messaging.batch.message_count': 1,
'faas.trigger': 'pubsub',
}),
});
})
.start(signal);

await runner.makeRequest('post', '/enqueue/ok');
await runner.completed();
});

it('emits a queue.publish span with batch attributes on env.MY_QUEUE.sendBatch', async ({ signal }) => {
const runner = createRunner(__dirname)
.unordered()
.expect((envelope: Envelope) => {
const publishSpan = findPublishSpan(envelope);
expect(publishSpan).toBeDefined();
expect(publishSpan).toMatchObject({
op: 'queue.publish',
description: 'send MY_QUEUE',
data: expect.objectContaining({
'messaging.system': 'cloudflare',
'messaging.destination.name': 'MY_QUEUE',
'messaging.operation.type': 'send',
'messaging.operation.name': 'send',
'messaging.batch.message_count': 3,
'sentry.origin': 'auto.faas.cloudflare.queue',
}),
});
})
.expect((envelope: Envelope) => {
expect(isConsumerTransaction(envelope)).toBe(true);
const tx = envelopeItem(envelope);
const trace = (tx.contexts as Record<string, Record<string, unknown>>).trace as Record<string, unknown>;
expect(trace).toMatchObject({
data: expect.objectContaining({
'messaging.batch.message_count': 3,
}),
});
})
.start(signal);

await runner.makeRequest('post', '/enqueue/batch');
await runner.completed();
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"name": "worker-name",
"compatibility_date": "2025-06-17",
"main": "index.ts",
"compatibility_flags": ["nodejs_compat"],
"queues": {
"producers": [
{
"queue": "test-queue",
"binding": "MY_QUEUE",
},
],
"consumers": [
{
"queue": "test-queue",
"max_batch_size": 10,
"max_batch_timeout": 1,
"max_retries": 0,
},
],
},
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import type { CloudflareOptions } from '../../client';
import { isDurableObjectNamespace, isJSRPC } from '../../utils/isBinding';
import { isDurableObjectNamespace, isJSRPC, isQueue } from '../../utils/isBinding';
import { appendRpcMeta } from '../../utils/rpcMeta';
import { getEffectiveRpcPropagation } from '../../utils/rpcOptions';
import { instrumentDurableObjectNamespace, STUB_NON_RPC_METHODS } from '../instrumentDurableObjectNamespace';
import { instrumentFetcher } from './instrumentFetcher';
import { instrumentQueueProducer } from './instrumentQueueProducer';

function isProxyable(item: unknown): item is object {
return item !== null && (typeof item === 'object' || typeof item === 'function');
Expand All @@ -17,9 +18,10 @@ const instrumentedBindings = new WeakMap<object, unknown>();
*
* Currently detects:
* - DurableObjectNamespace (via `idFromName` duck-typing)
* - Service bindings / JSRPC proxies (wraps `fetch` for trace propagation)
* - Service bindings / JSRPC proxies
* - Queue producers (via `send` + `sendBatch` duck-typing)
*
* Extensible for future binding types (KV, D1, Queue, etc.).
* Extensible for future binding types (KV, D1, etc.).
*
* @param env - The Cloudflare env object to instrument
* @param options - Optional CloudflareOptions to control RPC trace propagation
Expand All @@ -31,12 +33,6 @@ export function instrumentEnv<Env extends Record<string, unknown>>(env: Env, opt

const rpcPropagation = options ? getEffectiveRpcPropagation(options) : false;

// As of now only trace propagation is used for the instrumentEnv
// so this is an optimization to avoid wrapping the env in a proxy if trace propagation is disabled
if (!rpcPropagation) {
return env;
}

return new Proxy(env, {
get(target, prop, receiver) {
const item = Reflect.get(target, prop, receiver);
Expand All @@ -51,6 +47,17 @@ export function instrumentEnv<Env extends Record<string, unknown>>(env: Env, opt
return cached;
}

if (isQueue(item)) {
const bindingName = typeof prop === 'string' ? prop : String(prop);
const instrumented = instrumentQueueProducer(item, bindingName);
instrumentedBindings.set(item, instrumented);
return instrumented;
}

if (!rpcPropagation) {
return item;
}

if (isDurableObjectNamespace(item)) {
const instrumented = instrumentDurableObjectNamespace(item);
instrumentedBindings.set(item, instrumented);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ function wrapQueueHandler(
'faas.trigger': 'pubsub',
'messaging.destination.name': batch.queue,
'messaging.system': 'cloudflare',
'messaging.operation.type': 'process',
'messaging.operation.name': 'process',
'messaging.batch.message_count': batch.messages.length,
'messaging.message.retry.count': batch.messages.reduce((acc, message) => acc + message.attempts - 1, 0),
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'queue.process',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import type { MessageSendRequest, Queue, QueueSendBatchOptions, QueueSendOptions } from '@cloudflare/workers-types';
import { SEMANTIC_ATTRIBUTE_SENTRY_OP, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, startSpan } from '@sentry/core';

const ORIGIN = 'auto.faas.cloudflare.queue';

function startPublishSpan<T>(
options: {
bindingName: string;
bodySize: number | undefined;
messageCount?: number;
},
callback: () => T,
): T {
const { bindingName, bodySize, messageCount } = options;

return startSpan(
{
op: 'queue.publish',
name: `send ${bindingName}`,
attributes: {
'messaging.system': 'cloudflare',
'messaging.destination.name': bindingName,
'messaging.operation.type': 'send',
'messaging.operation.name': 'send',
...(messageCount !== undefined && { 'messaging.batch.message_count': messageCount }),
'messaging.message.body.size': bodySize,
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'queue.publish',
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: ORIGIN,
},
},
callback,
);
}

function getBodySize(body: unknown): number | undefined {
if (body == null) {
return undefined;
}

if (typeof body === 'string') {
return new TextEncoder().encode(body).byteLength;
}

if (body instanceof ArrayBuffer) {
return body.byteLength;
}

if (ArrayBuffer.isView(body)) {
return body.byteLength;
}

try {
return new TextEncoder().encode(JSON.stringify(body)).byteLength;
} catch {
return undefined;
}
}

/**
* Wraps a Queue producer binding to create `queue.publish` spans on
* `send` and `sendBatch` calls.
*
* The queue's own name is not available on the binding object, so we use
* the env binding key (e.g. `MY_QUEUE`) as `messaging.destination.name`.
*/
export function instrumentQueueProducer<T extends Queue>(queue: T, bindingName: string): T {
return new Proxy(queue, {
get(target, prop, receiver) {
if (prop === 'send') {
const original = Reflect.get(target, prop, receiver) as Queue['send'];

return function (this: unknown, message: unknown, options?: QueueSendOptions): Promise<void> {
return startPublishSpan({ bindingName, bodySize: getBodySize(message) }, () =>
Reflect.apply(original, target, [message, options]),
);
};
}

if (prop === 'sendBatch') {
const original = Reflect.get(target, prop, receiver) as Queue['sendBatch'];
return function (
this: unknown,
messages: Iterable<MessageSendRequest>,
options?: QueueSendBatchOptions,
): Promise<void> {
const messageArray = Array.from(messages);
const totalBodySize = messageArray.reduce<number>((acc, m) => {
const size = getBodySize(m.body);
return size === undefined ? acc : acc + size;
}, 0);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendBatch reports body size 0 instead of undefined

Low Severity

The sendBatch body size calculation uses reduce with an initial value of 0, so totalBodySize is always a number (minimum 0). When all messages have unsizable bodies (e.g., null or circular references), totalBodySize will be 0 and messaging.message.body.size gets set to 0. This is inconsistent with send, which correctly passes undefined from getBodySize when the body can't be sized, causing the attribute to be omitted. The sendBatch path emits a misleading 0 instead.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit b9b0533. Configure here.


return startPublishSpan({ bindingName, bodySize: totalBodySize, messageCount: messageArray.length }, () =>
Reflect.apply(original, target, [messageArray, options]),
);
};
}

return Reflect.get(target, prop, receiver);
},
});
}
10 changes: 9 additions & 1 deletion packages/cloudflare/src/utils/isBinding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

import type { DurableObjectNamespace } from '@cloudflare/workers-types';
import type { DurableObjectNamespace, Queue } from '@cloudflare/workers-types';

/**
* Checks if a value is a JSRPC proxy (service binding).
Expand Down Expand Up @@ -59,3 +59,11 @@ const isNotJSRPC = (item: unknown): item is Record<string, unknown> => !isJSRPC(
export function isDurableObjectNamespace(item: unknown): item is DurableObjectNamespace {
return item != null && isNotJSRPC(item) && typeof item.idFromName === 'function';
}

/**
* Duck-type check for Queue producer bindings.
* Queue has `send` and `sendBatch` async methods.
*/
export function isQueue(item: unknown): item is Queue {
return item != null && isNotJSRPC(item) && typeof item.send === 'function' && typeof item.sendBatch === 'function';
}
Loading
Loading