Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/cloudflare/internal/test/workflows/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,16 @@ wd_test(
args = ["--experimental"],
data = glob(["*.js"]),
)

wd_test(
src = "workflows-step-promise-test.wd-test",
args = ["--experimental"],
data = glob(["*.js"]),
)

wd_test(
src = "workflows-no-rollback-test.wd-test",
args = ["--experimental"],
data = glob(["*.js"]),
# Flag-disabled regression: same JS, no workflows_step_rollback flag.
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2026 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import { RpcTarget } from 'cloudflare:workers';
import assert from 'node:assert';

// Minimal mock step for the flag-disabled path.
class MockStep extends RpcTarget {
calls = [];

async do(...args) {
const callback = typeof args[1] === 'function' ? args[1] : args[2];
this.calls.push({ method: 'do', name: args[0] });
return await callback();
}

async sleep(name) {
this.calls.push({ method: 'sleep', name });
}

async sleepUntil(name) {
this.calls.push({ method: 'sleepUntil', name });
}

async waitForEvent(name) {
this.calls.push({ method: 'waitForEvent', name });
return { payload: 'data', timestamp: '2024-01-01' };
}
}

// Without workflows_step_rollback, step is the raw RPC stub.
// All basic step methods still work without wrapping.
export const noRollbackWithoutFlag = {
async test(_, env) {
const mock = new MockStep();
const result = await env.BasicWorkflow.run({ payload: 'test' }, mock);

assert.strictEqual(result.doResult, 'hello');
assert.strictEqual(result.slept, true);
assert.strictEqual(result.waited, true);
assert.strictEqual(mock.calls.length, 4);
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Workerd = import "/workerd/workerd.capnp";

# Regression test: exercises workflows WITHOUT the workflows_step_rollback flag.
# Reuses BasicWorkflow from workflows-step-promise-workflows.js but does not
# enable the flag, verifying the unwrapped step RPC stub works normally.

const unitTests :Workerd.Config = (
services = [
( name = "workflows-no-rollback-test",
worker = .testWorker,
),
( name = "workflows",
worker = .workflowsWorker,
),
]
);

const testWorker :Workerd.Worker = (
modules = [
(name = "worker", esModule = embed "workflows-no-rollback-test.js"),
],
compatibilityFlags = ["nodejs_compat", "experimental"],
bindings = [
(name = "BasicWorkflow", service = (name = "workflows", entrypoint = "BasicWorkflow")),
],
);

const workflowsWorker :Workerd.Worker = (
modules = [
(name = "worker", esModule = embed "workflows-step-promise-workflows.js"),
],
compatibilityFlags = ["nodejs_compat", "experimental"],
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright (c) 2026 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import { RpcTarget } from 'cloudflare:workers';
import assert from 'node:assert';

// Mock step that records calls for verification.
class MockStep extends RpcTarget {
calls = [];

async do(...args) {
const name = args[0];
let callback,
rollbackFn = null,
rollbackConfig = null;
if (typeof args[1] === 'function') {
callback = args[1];
rollbackFn = args[2] || null;
rollbackConfig = args[3] || null;
} else {
callback = args[2];
rollbackFn = args[3] || null;
rollbackConfig = args[4] || null;
}
this.calls.push({
method: 'do',
name,
hadRollbackFn: rollbackFn !== null,
hadRollbackConfig: rollbackConfig !== null,
rollbackConfig,
});
return await callback();
}

async sleep(name, duration) {
this.calls.push({ method: 'sleep', name, duration });
}

async sleepUntil(name, timestamp) {
this.calls.push({ method: 'sleepUntil', name, timestamp });
}

async waitForEvent(...args) {
this.calls.push({
method: 'waitForEvent',
name: args[0],
hadRollbackFn: (args[2] || null) !== null,
});
return {
payload: 'event-data',
timestamp: '2024-01-01',
type: args[1]?.type,
};
}
}

// Mock step that invokes the rollback fn, simulating engine compensation.
// Captures the args passed to the rollback fn so the test can verify them.
class RollbackCallingMockStep extends RpcTarget {
rollbackArgs = null;

async do(...args) {
const name = args[0];
const callback = typeof args[1] === 'function' ? args[1] : args[2];
const rollbackFn =
typeof args[1] === 'function' ? args[2] || null : args[3] || null;
const result = await callback();
if (rollbackFn !== null) {
const ctx = {
error: 'simulated-error',
output: result,
stepName: name,
};
await rollbackFn(ctx);
this.rollbackArgs = ctx;
}
return result;
}

async sleep() {}
async sleepUntil() {}
async waitForEvent(...args) {
return {
payload: 'event-data',
timestamp: '2024-01-01',
type: args[1]?.type,
};
}
}

// Verifies all forwarding paths: do (no rollback), do+rollback(fn),
// do+rollback(config,fn), sleep, sleepUntil, waitForEvent+rollback.
export const forwarding = {
async test(_, env) {
const mock = new MockStep();
const result = await env.ForwardingWorkflow.run({ payload: 'test' }, mock);

assert.strictEqual(result.basic, 'basic-result');
assert.strictEqual(result.withFn, 'fn-result');
assert.strictEqual(result.withConfig, 'config-result');
assert.strictEqual(result.waited.type, 'approval');

// 6 calls total: 3x do, sleep, sleepUntil, waitForEvent
assert.strictEqual(mock.calls.length, 6);

// do('basic') — no rollback args
assert.strictEqual(mock.calls[0].hadRollbackFn, false);

// do('with-fn') — rollback fn, no config
assert.strictEqual(mock.calls[1].hadRollbackFn, true);
assert.strictEqual(mock.calls[1].hadRollbackConfig, false);

// do('with-config') — rollback fn + config
assert.strictEqual(mock.calls[2].hadRollbackFn, true);
assert.strictEqual(mock.calls[2].hadRollbackConfig, true);
assert.deepStrictEqual(mock.calls[2].rollbackConfig, {
retries: { limit: 3, delay: '5 seconds', backoff: 'linear' },
timeout: '30 seconds',
});

// sleep / sleepUntil pass through
assert.strictEqual(mock.calls[3].method, 'sleep');
assert.strictEqual(mock.calls[4].method, 'sleepUntil');

// waitForEvent with rollback
assert.strictEqual(mock.calls[5].method, 'waitForEvent');
assert.strictEqual(mock.calls[5].hadRollbackFn, true);
},
};

// .rollback() guards: called twice, after await, with invalid arg
export const errorGuards = {
async test(_, env) {
const mock = new MockStep();
const result = await env.ErrorGuardsWorkflow.run({ payload: 'test' }, mock);

assert.strictEqual(result.errors.length, 3);
assert.match(result.errors[0], /can only be called once/);
assert.match(result.errors[1], /must be called before the step is awaited/);
assert.match(result.errors[2], /expects a function/);
},
};

// Step callback error surfaces through StepPromise
export const errorPropagation = {
async test(_, env) {
const mock = new MockStep();
const result = await env.ErrorPropagationWorkflow.run(
{ payload: 'test' },
mock
);
assert.strictEqual(result.threw, true);
assert.strictEqual(result.message, 'step-boom');
},
};

// Rollback fn is callable over RPC with correct context
export const rollbackCallable = {
async test(_, env) {
const mock = new RollbackCallingMockStep();
const result = await env.RollbackCallableWorkflow.run(
{ payload: 'test' },
mock
);
assert.strictEqual(result.value, 'step-output');
assert.strictEqual(mock.rollbackArgs.output, 'step-output');
assert.strictEqual(mock.rollbackArgs.stepName, 'my-step');
assert.strictEqual(mock.rollbackArgs.error, 'simulated-error');
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "workflows-step-promise-test",
worker = .testWorker,
),
( name = "workflows",
worker = .workflowsWorker,
),
]
);

const testWorker :Workerd.Worker = (
modules = [
(name = "worker", esModule = embed "workflows-step-promise-test.js"),
],
compatibilityFlags = ["nodejs_compat", "experimental", "workflows_step_rollback"],
bindings = [
(name = "ForwardingWorkflow", service = (name = "workflows", entrypoint = "ForwardingWorkflow")),
(name = "ErrorGuardsWorkflow", service = (name = "workflows", entrypoint = "ErrorGuardsWorkflow")),
(name = "ErrorPropagationWorkflow", service = (name = "workflows", entrypoint = "ErrorPropagationWorkflow")),
(name = "RollbackCallableWorkflow", service = (name = "workflows", entrypoint = "RollbackCallableWorkflow")),
],
);

const workflowsWorker :Workerd.Worker = (
modules = [
(name = "worker", esModule = embed "workflows-step-promise-workflows.js"),
],
compatibilityFlags = ["nodejs_compat", "experimental", "workflows_step_rollback"],
);
Loading
Loading