Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
01c47ac
Debug tools template
1996fanrui Sep 26, 2025
62319f5
tmp version change
1996fanrui Jan 14, 2026
5c027b0
[hotfix] Extract RecordFilter as the interface
1996fanrui Jan 15, 2026
97dfc1c
[hotfix] Extract VirtualChannel as the public class
1996fanrui Jan 15, 2026
2f4927a
[hotfix] Including task name and subtask index into channel-state-uns…
1996fanrui Jan 15, 2026
188ffcd
[FLINK-38541][checkpoint] Introducing config option: execution.checkp…
1996fanrui Jan 15, 2026
a5cb22f
[FLINK-38541][checkpoint] Randomize UNALIGNED_DURING_RECOVERY_ENABLED…
1996fanrui Jan 15, 2026
b29729f
[FLINK-38930][checkpoint] Filtering record before processing without …
1996fanrui Feb 18, 2026
08e8682
[hotfix] Fix LocalInputChannel.getBuffersInUseCount to include toBeCo…
1996fanrui Feb 18, 2026
c3b5d38
[FLINK-39018][checkpoint] Support LocalInputChannel checkpoint snapsh…
1996fanrui Feb 18, 2026
f9008a4
[FLINK-39018][network] Fix LocalInputChannel priority event and buffe…
1996fanrui Feb 18, 2026
7b84206
[FLINK-38543][network] Buffer migration from RecoveredInputChannel to…
1996fanrui Feb 18, 2026
864d1ab
[FLINK-38543][checkpoint] Fix Mailbox loop interrupted before recover…
1996fanrui Feb 18, 2026
13b587a
[FLINK-38543][checkpoint] Introduce bufferFilteringCompleteFuture for…
1996fanrui Feb 18, 2026
e53c06d
[FLINK-38543][checkpoint] Change overall UC restore process for check…
1996fanrui Feb 18, 2026
c92f57e
[hotfix] Add FLIP-547 design documents, requirements, demo job, and t…
1996fanrui Feb 18, 2026
ef122f4
Using heap buffer to avoid hanging if buffer pool is not available
1996fanrui Feb 19, 2026
ccde5b1
Remove fixed port to avoid port conflict
1996fanrui Feb 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions .claude/agents/flink-test-runner.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
---
name: flink-test-runner
description: Use this agent when code changes have been made in the Flink project and you need to run relevant tests to validate the changes. The agent will examine the CLAUDE.md file for test running instructions, execute appropriate tests for the code changes, and handle test failures by documenting them in a review file.\n\nExamples:\n<example>\nContext: User has made changes to Flink source code and wants to test them\nuser: "I've modified some streaming operators, please run the relevant tests"\nassistant: "I'll use the Task tool to launch the flink-test-runner agent to execute tests for your streaming operator changes"\n<commentary>\nSince the user has made code changes and wants to test them, use the flink-test-runner agent to handle the test execution process according to the project's CLAUDE.md guidelines.\n</commentary>\n</example>\n\n<example>\nContext: User has completed multiple code modifications and wants to validate them all at once\nuser: "I'm done with my changes to the Flink SQL parser, can you run tests now?"\nassistant: "I'll use the Task tool to launch the flink-test-runner agent to run tests for your SQL parser modifications"\n<commentary>\nThe user has indicated they've completed their changes and want to run tests, which matches exactly when this agent should be used - after all code modifications are complete.\n</commentary>\n</example>
model: inherit
---

You are a Flink Test Runner agent specialized in executing tests for the Apache Flink project according to the project's specific guidelines. Your primary responsibility is to run relevant tests after code changes have been made and handle test failures appropriately.

## Core Responsibilities

1. **Examine CLAUDE.md Instructions**: First, read and understand the test running guidelines from the project's CLAUDE.md file
2. **Identify Relevant Tests**: Determine which tests are related to the code changes that have been made
3. **Execute Targeted Tests**: Run only the tests relevant to the modified code, not the entire test suite
4. **Handle Test Failures**: If tests fail, document the failures and their causes in a review file
5. **Compilation Handling**: If tests fail due to compilation issues, run the Maven build command before retrying tests

## Operating Guidelines

### Test Execution Principles
- **Wait for Completion**: Only run tests after ALL code changes have been completed, not after each individual change
- **Avoid Global Tests**: NEVER run `mvn test` globally as it takes over 30 minutes
- **Targeted Testing**: Identify and run only tests related to the modified code
- **Minimize Compilation**: Avoid unnecessary compilation as it takes 5+ minutes

### Test Failure Handling
- When tests fail, create a review document containing:
- The specific test(s) that failed
- The exact error messages and stack traces
- Analysis of the failure cause
- Recommendations for fixing the issues

### Compilation Recovery
- If test failures appear to be caused by compilation issues:
- Execute: `./mvnw clean install -U -Pfast -DskipTests`
- Wait for compilation to complete (5+ minutes)
- Retry the failed tests

## Workflow

1. **Initial Assessment**:
- Read CLAUDE.md to understand current test guidelines
- Identify what code changes have been made
- Determine which tests are relevant

2. **Test Execution**:
- Run only the relevant tests using Maven
- Monitor test execution and results

3. **Result Handling**:
- If all tests pass: Report success
- If tests fail: Create comprehensive review document
- If compilation issues: Run build command and retry

4. **Documentation**:
- Maintain clear records of test executions
- Document all failures with detailed analysis
- Provide actionable recommendations for fixes

## Output Format

For successful test runs: Provide a clear summary of tests executed and their status

For failed test runs: Create a review document with:
- List of failed tests
- Error details and stack traces
- Root cause analysis
- Suggested fixes
- Next steps recommendation
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ logs.zip
.mvn/maven.config
target
tmp
log/*
*.class
*.iml
*.swp
Expand Down
71 changes: 71 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
## 代码生成规则

主 agent 不允许写代码,所有的代码必须由子 agent 来完成,例如 flink-code-developer

请严格遵守以下规则:
- 使用 TDD 开发模型,修改或新增代码前请先完成 test 代码的变更
- 等所有代码修改完且结束 coding 前,请务必运行 mvn 对代码修改相关的代码进行 test ,如果有报错请自动修复,直到报错被解决后才停止。

注意:
1. 一轮代码如果要修改多处代码,不用修改完每处代码都运行 test,等这一轮所有代码都修改完以后让 flink-test-runner sub agent 运行相关 test。避免test 运行次数过多。
2. 不要运行 mvn 全局的 test,当前项目的全局 test 运行极慢,可能会超过 30分钟。
3. 如果 test 运行时报错,且报错是因为 代码 编码导致,则可能执行 ./mvnw clean install -U -Pfast -DskipTests -P java11-target -P java11 进行编译。编译后再次 执行相关的 test。编辑也需要 5分钟以上,所以尽量避免重复编译
4. test 必须由 flink-test-runner sub agent 运行,主 agent 是协调者。不做具体的开发和测试。

注: 需要加 -P java11-target -P java11 后缀来运行 Java 11。

# Workflow

<workflow>
0. 如果你判断我的输入提出的是一个新需求,且需求在 MANUAL_REQUIREMENT.md 的文档里,可以按照下面的标准软件工程的方式独立开展工作,需要时才向我询问
1. 每当我输入新的需求的时候,为了规范需求质量和验收标准,你首先会搞清楚问题和需求,然后再进入下一阶段
2. 需求文档和验收标准设计:首先完成需求的设计,按照 EARS 简易需求语法方法来描述,保存在与 MANUAL_REQUIREMENT.md 同一个目录下的 `specs/requirements.md` 中,跟我进行确认,最终确认清楚后,需求定稿,然后再进入下一阶段,参考格式如下

```markdown
# 需求文档

## 介绍

需求描述

## 需求

### 需求 1 - 需求名称

**用户故事:** 用户故事内容

#### 验收标准

1. 梳理需求:采用 ERAS 描述的子句 While <可选前置条件>, when <可选触发器>, the <系统名称> shall <系统响应>,例如 When 选择"静音"时,笔记本电脑应当抑制所有音频输出。
2. 需求评审:梳理完需求后,请自动扮演 产品经理 的角色,评审所有的需求设计是否符合功能预期?交互是否友好?异常 case 的处理是否友好。如果有不符合预期的,请梳理到 `specs/requirements.md` 文件的同一个目录下的 pm_review.md 文档,并且每一项都可以进行标记是否完成。
3. 需求改进:请基于 pm_review.md 的剪辑对第一步生成的 `specs/requirements.md` 进行改进。如果合理,请修改需求文档,如果不合理,请写明原因。修改完成后,可以将相应的建议标记为完成。
4. 反复执行 第二步和 第三步,直到 产品经理对所有的需求设计都非常满意再停止运行。等我人工确认后,再进行后续步骤。
5. 技术方案设计: 在完成需求的设计之后,你会根据当前的技术架构和前面确认好的需求,进行需求的技术方案设计,保存在与 MANUAL_REQUIREMENT.md 同一个目录下的 `specs/design.md` 中,精简但是能够准确的描述技术的架构(例如架构、技术栈、技术选型、数据库/接口设计、测试策略、安全性),必要时可以用 mermaid 来绘图,跟我确认清楚后,然后再进入下一阶段
6. 技术方案评审:这个流程还有多个评审官。
系统架构师 (System Architect): 关注高层设计、技术选型、可扩展性、性能和模块间的交互。评审建议写到 如果有不符合预期的 `specs/Architect_review.md` 里。
软件工程师 (Software Engineer / Code Reviewer): 关注实现细节、代码的解耦、代码可读性、可维护性、API 设计是否合理以及是否遵循了项目规范。评审建议写到 如果有不符合预期的 `specs/software_review.md` 里。
测试/QA工程师 (Test/QA Engineer): 关注方案的可测试性、如何设计测试用例、边缘情况处理以及自动化测试策略。评审建议写到 如果有不符合预期的 `specs/qa_review.md` 里。
7. 技术方案完善:对上一步所有评审官的建议进行一一评估。决定是否要修改设计,并且需要更新进度到评审文档里。
8. 反复执行 第6步和 第7步,直到 所有评审官对所有的技术方案设计都非常满意再停止运行,等我人工确认后,再进行后续步骤。
9. 任务拆分:在完成技术方案设计后,你会根据需求文档和技术方案,细化具体要做的事情,保存在与 MANUAL_REQUIREMENT.md 同一个目录下的 `specs/tasks.md` 中, 跟我确认清楚后,然后再进入下一阶段,开始正式执行任务,同时需要及时更新任务的状态,执行的时候尽可能独立自主运行,保证效率和质量

评审官的角色注意事项:
1. 评审官每次重新评审时,评审官要以一个新人的视角来评估,可能会发现一些之前评估中可能漏掉的问题。而不是 check 之前评估的问题是否解决了。这样才能达到重复评估的效果。
2. 评审官不要被设计文档误导。有可能初版设计的不好,导致评审时围绕着不好的方案。就像大家讨论一个问题之前最好大家都先独立思考一下,否则先听某一个人的分享,所有评审者的思路都被初版设计带偏了。
3. 每一轮评审的结果都要保存到 review 文档里,不要让下一轮的评审覆盖上一轮的评审结果。

注意:如果是改动需求,则 specs 目录下文档不仅仅包含新的需求改动,而是要在之前文档基础上增加新的需求。也就是 specs 目录下的文档要包含所有的新老需求的文档。


任务参考格式如下

``` markdown
# 实施计划

- [ ] 1. 任务信息
- 具体要做的事情
- ...
- _需求: 相关的需求点的编号

```
</workflow>
2 changes: 1 addition & 1 deletion flink-annotations/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parent</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-architecture-tests</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-architecture-tests-base</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-architecture-tests</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-architecture-tests-production</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-architecture-tests</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-architecture-tests-test</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion flink-architecture-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parent</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-architecture-tests</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion flink-clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parent</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-clients</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion flink-connectors/flink-connector-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-connector-base</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion flink-connectors/flink-connector-datagen-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-connector-datagen-tests</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion flink-connectors/flink-connector-datagen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-connector-datagen</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion flink-connectors/flink-connector-files/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-connector-files</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion flink-connectors/flink-file-sink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-file-sink-common</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion flink-connectors/flink-hadoop-compatibility/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-hadoop-compatibility</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion flink-connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parent</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>


Expand Down
2 changes: 1 addition & 1 deletion flink-container/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parent</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-container</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion flink-core-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parent</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-core-api</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion flink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parent</artifactId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ public class CheckpointingOptions {
+ "Each subtask will create a new channel state file when this is configured to 1.");

@Experimental
@Documentation.Section(Documentation.Sections.COMMON_CHECKPOINTING)
public static final ConfigOption<Boolean> UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM =
ConfigOptions.key(
"execution.checkpointing.unaligned.recover-output-on-downstream.enabled")
Expand All @@ -656,6 +657,39 @@ public class CheckpointingOptions {
"Whether recovering output buffers of upstream task on downstream task directly "
+ "when job restores from the unaligned checkpoint.");

/**
* Whether to enable checkpointing during recovery from an unaligned checkpoint.
*
* <p>When enabled, the job can take checkpoints while still recovering channel state (inflight
* data) from a previous unaligned checkpoint. This avoids the need to wait for full recovery
* before the first checkpoint can be triggered, which reduces the window of vulnerability to
* failures during recovery.
*
* <p>This option requires {@link #UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM} to be enabled. It
* does not require unaligned checkpoints to be currently enabled, because a job may restore
* from an unaligned checkpoint while having unaligned checkpoints disabled for the new
* execution.
*/
@Experimental
@Documentation.Section(Documentation.Sections.COMMON_CHECKPOINTING)
public static final ConfigOption<Boolean> UNALIGNED_DURING_RECOVERY_ENABLED =
ConfigOptions.key("execution.checkpointing.unaligned.during-recovery.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
Description.builder()
.text(
"Whether to enable checkpointing during recovery from an unaligned checkpoint. "
+ "When enabled, the job can take checkpoints while still recovering channel state "
+ "(inflight data), reducing the window of vulnerability to failures during recovery.")
.linebreak()
.linebreak()
.text(
"This option requires %s to be enabled.",
TextElement.code(
UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM.key()))
.build());

/**
* Determines whether checkpointing is enabled based on the configuration.
*
Expand Down Expand Up @@ -751,4 +785,24 @@ public static boolean isUnalignedCheckpointInterruptibleTimersEnabled(Configurat
}
return config.get(ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS);
}

/**
* Determines whether unaligned checkpoint support during recovery is enabled.
*
* <p>This feature requires {@link #UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM} to be enabled. Note
* that it does not require unaligned checkpoints to be currently enabled, because a job may
* restore from an unaligned checkpoint while having unaligned checkpoints disabled for the new
* execution.
*
* @param config the configuration to check
* @return {@code true} if unaligned checkpoint during recovery is enabled, {@code false}
* otherwise
*/
@Internal
public static boolean isUnalignedDuringRecoveryEnabled(Configuration config) {
if (!config.get(UNALIGNED_RECOVER_OUTPUT_ON_DOWNSTREAM)) {
return false;
}
return config.get(UNALIGNED_DURING_RECOVERY_ENABLED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class HeartbeatManagerOptions {
public static final ConfigOption<Duration> HEARTBEAT_TIMEOUT =
key("heartbeat.timeout")
.durationType()
.defaultValue(Duration.ofMillis(50000L))
.defaultValue(Duration.ofHours(2))
.withDescription(
"Timeout for requesting and receiving heartbeats for both sender and receiver sides.");

Expand Down
2 changes: 1 addition & 1 deletion flink-datastream-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.
<parent>
<artifactId>flink-parent</artifactId>
<groupId>org.apache.flink</groupId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-datastream-api</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion flink-datastream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.
<parent>
<artifactId>flink-parent</artifactId>
<groupId>org.apache.flink</groupId>
<version>2.3-SNAPSHOT</version>
<version>os-troubleshooting-b55557f2dfe</version>
</parent>

<artifactId>flink-datastream</artifactId>
Expand Down
Loading