Skip to content

Commit

Permalink
Rewrite RPC (#5050)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkschneider authored Feb 26, 2025
1 parent 8803f88 commit 4f0f666
Show file tree
Hide file tree
Showing 69 changed files with 2,659 additions and 223 deletions.
3 changes: 3 additions & 0 deletions rewrite-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ dependencies {
implementation("io.github.classgraph:classgraph:latest.release")
implementation("org.yaml:snakeyaml:latest.release")

implementation("io.moderne:jsonrpc:latest.release")
implementation("org.objenesis:objenesis:latest.release")

testImplementation("org.assertj:assertj-core:latest.release")
testImplementation(project(":rewrite-test"))
}
84 changes: 47 additions & 37 deletions rewrite-core/src/main/java/org/openrewrite/DataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
package org.openrewrite;

import com.fasterxml.jackson.annotation.JsonIgnoreType;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.Getter;
import lombok.Setter;
import lombok.RequiredArgsConstructor;
import org.intellij.lang.annotations.Language;

import java.lang.reflect.ParameterizedType;
Expand All @@ -27,67 +26,78 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* @param <Row> The model type for a single row of this extract.
* @param <Row> The model type for a single row of this data table.
*/
@Getter
@Incubating(since = "7.35.0")
@JsonIgnoreType
@RequiredArgsConstructor
public class DataTable<Row> {
private final String name;
private final Class<Row> type;

@Language("markdown")
private final @NlsRewrite.DisplayName String displayName;

@Language("markdown")
private final @NlsRewrite.Description String description;

@Setter
private boolean enabled = true;

/**
* Ignore any row insertions after this cycle. This prevents
* data table producing recipes from having to keep track of state across
* multiple cycles to prevent duplicate row entries.
* @param recipe The recipe that this data table is associated with.
* @param type The model type for a single row of this data table.
* @param name The name of this data table.
* @param displayName The display name of this data table.
* @param description The description of this data table.
* @deprecated Use {@link #DataTable(Recipe, String, String)} instead.
*/
protected int maxCycle = 1;

@SuppressWarnings("unused")
@Deprecated
public DataTable(Recipe recipe, Class<Row> type, String name,
@Language("markdown") String displayName,
@Language("markdown") String description) {
this.type = type;
this.name = name;
@NlsRewrite.DisplayName @Language("markdown") String displayName,
@NlsRewrite.Description @Language("markdown") String description) {
this.displayName = displayName;
this.description = description;
recipe.addDataTable(this);
}

/**
* Construct a new data table.
*
* @param recipe The recipe that this data table is associated with.
* @param displayName The display name of this data table.
* @param description The description of this data table.
*/
public DataTable(Recipe recipe,
@Language("markdown") String displayName,
@Language("markdown") String description) {
//noinspection unchecked
this.type = (Class<Row>) ((ParameterizedType) getClass().getGenericSuperclass())
.getActualTypeArguments()[0];
this.name = getClass().getName();
@NlsRewrite.DisplayName @Language("markdown") String displayName,
@NlsRewrite.Description @Language("markdown") String description) {
this.displayName = displayName;
this.description = description;
recipe.addDataTable(this);

// Only null when transferring DataTables over RPC.
//noinspection ConstantValue
if (recipe != null) {
recipe.addDataTable(this);
}
}

@SuppressWarnings("unused")
public TypeReference<List<Row>> getRowsTypeReference() {
return new TypeReference<List<Row>>() {
};
public Class<Row> getType() {
//noinspection unchecked
return (Class<Row>) ((ParameterizedType) getClass().getGenericSuperclass())
.getActualTypeArguments()[0];
}

public String getName() {
return getClass().getName();
}

public void insertRow(ExecutionContext ctx, Row row) {
if (enabled && ctx.getCycle() <= maxCycle) {
ctx.computeMessage(ExecutionContext.DATA_TABLES, row, ConcurrentHashMap::new, (extract, allDataTables) -> {
//noinspection unchecked
List<Row> dataTablesOfType = (List<Row>) allDataTables.computeIfAbsent(this, c -> new ArrayList<>());
dataTablesOfType.add(row);
return allDataTables;
});
// Ignore any row insertions after this cycle. This prevents
// data table producing recipes from having to keep track of state across
// multiple cycles to prevent duplicate row entries.
if (ctx.getCycle() > 1) {
return;
}
ctx.computeMessage(ExecutionContext.DATA_TABLES, row, ConcurrentHashMap::new, (extract, allDataTables) -> {
//noinspection unchecked
List<Row> dataTablesOfType = (List<Row>) allDataTables.computeIfAbsent(this, c -> new ArrayList<>());
dataTablesOfType.add(row);
return allDataTables;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,26 @@
*/
package org.openrewrite;

import lombok.Getter;
import org.jspecify.annotations.Nullable;

import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class DelegatingExecutionContext implements ExecutionContext {
@Getter
private final ExecutionContext delegate;

public DelegatingExecutionContext(ExecutionContext delegate) {
this.delegate = delegate;
}

@Override
public @Nullable Map<String, Object> getMessages() {
return delegate.getMessages();
}

@Override
public void putMessage(String key, @Nullable Object value) {
delegate.putMessage(key, value);
Expand Down
80 changes: 75 additions & 5 deletions rewrite-core/src/main/java/org/openrewrite/ExecutionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,26 @@
package org.openrewrite;

import org.jspecify.annotations.Nullable;
import org.openrewrite.rpc.RpcCodec;
import org.openrewrite.rpc.RpcReceiveQueue;
import org.openrewrite.rpc.RpcSendQueue;
import org.openrewrite.rpc.request.Visit;
import org.openrewrite.scheduling.RecipeRunCycle;

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.*;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;

/**
* Passes messages between individual visitors or parsing operations and allows errors to be propagated
* back to the process controlling parsing or recipe execution.
*/
public interface ExecutionContext {
public interface ExecutionContext extends RpcCodec<ExecutionContext> {
String CURRENT_CYCLE = "org.openrewrite.currentCycle";
String CURRENT_RECIPE = "org.openrewrite.currentRecipe";
String DATA_TABLES = "org.openrewrite.dataTables";
Expand All @@ -49,6 +54,9 @@ default Set<TreeObserver.Subscription> getObservers() {
return getMessage("org.openrewrite.internal.treeObservers", Collections.emptySet());
}

@Nullable
Map<String, Object> getMessages();

void putMessage(String key, @Nullable Object value);

<T> @Nullable T getMessage(String key);
Expand Down Expand Up @@ -107,4 +115,66 @@ default int getCycle() {
default RecipeRunCycle<?> getCycleDetails() {
return requireNonNull(getMessage(CURRENT_CYCLE));
}

/**
* The after state will change if any messages have changed by a call to clone in the
* {@link Visit.Handler} implementation.
*/
@Override
default void rpcSend(ExecutionContext after, RpcSendQueue q) {
// The after state will change if any messages have changed by a call to clone
q.getAndSend(after, ctx -> {
Map<String, Object> messages = new HashMap<>(ctx.getMessages() == null ?
emptyMap() : ctx.getMessages());
// The remote side will manage its own recipe and cycle state.
messages.remove(CURRENT_CYCLE);
messages.remove(CURRENT_RECIPE);
messages.remove(DATA_TABLES);
return messages;
});

Map<DataTable<?>, List<?>> dt = after.getMessage(DATA_TABLES);
q.getAndSendList(after, sendWholeList(dt == null ? null : dt.keySet()), DataTable::getName, null);
if (dt != null) {
for (List<?> rowSet : dt.values()) {
q.getAndSendList(after, sendWholeList(rowSet),
row -> Integer.toString(System.identityHashCode(row)),
null);
}

}
}

@Override
default ExecutionContext rpcReceive(ExecutionContext before, RpcReceiveQueue q) {
Map<String, Object> messages = q.receive(before.getMessages());
for (Map.Entry<String, Object> e : messages.entrySet()) {
before.putMessage(e.getKey(), e.getValue());
}

List<DataTable<?>> dataTables = q.receiveList(emptyList(), null);
//noinspection ConstantValue
if (dataTables != null) {
for (DataTable<?> dataTable : dataTables) {
List<?> rows = q.receiveList(emptyList(), null);
before.computeMessage(ExecutionContext.DATA_TABLES, rows, ConcurrentHashMap::new, (extract, allDataTables) -> {
//noinspection unchecked
List<Object> dataTablesOfType = (List<Object>) allDataTables.computeIfAbsent(dataTable, c -> new ArrayList<>());
dataTablesOfType.addAll(rows);
return allDataTables;
});
}
}
return before;
}

static <T> Function<ExecutionContext, @Nullable List<T>> sendWholeList(@Nullable Collection<T> list) {
AtomicBoolean retrievedAfter = new AtomicBoolean(false);
return ctx -> {
if (!retrievedAfter.getAndSet(true)) {
return list == null ? null : new ArrayList<>(list);
}
return null;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
@Value
@EqualsAndHashCode(callSuper = false)
public class FindCollidingSourceFiles extends ScanningRecipe<FindCollidingSourceFiles.Accumulator> {

transient CollidingSourceFiles collidingSourceFiles = new CollidingSourceFiles(this);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.openrewrite;

import lombok.Getter;
import org.jspecify.annotations.Nullable;

import java.time.Duration;
Expand All @@ -23,16 +24,17 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class InMemoryExecutionContext implements ExecutionContext {
private final Map<String, Object> messages = new ConcurrentHashMap<>();
public class InMemoryExecutionContext implements ExecutionContext, Cloneable {
@Getter
@Nullable
private Map<String, Object> messages;

private final Consumer<Throwable> onError;
private final BiConsumer<Throwable, ExecutionContext> onTimeout;

public InMemoryExecutionContext() {
this(
t -> {
}
);
this(t -> {
});
}

public InMemoryExecutionContext(Consumer<Throwable> onError) {
Expand All @@ -52,23 +54,31 @@ public InMemoryExecutionContext(Consumer<Throwable> onError, Duration runTimeout

@Override
public void putMessage(String key, @Nullable Object value) {
if (value == null) {
if (value == null && messages != null) {
messages.remove(key);
} else {
messages.put(key, value);
if (messages == null) {
messages = new ConcurrentHashMap<>();
}
if (value != null) {
messages.put(key, value);
}
}
}

@Override
public <T> @Nullable T getMessage(String key) {
if (messages == null) {
messages = new ConcurrentHashMap<>();
}
//noinspection unchecked
return (T) messages.get(key);
}

@Override
public <T> @Nullable T pollMessage(String key) {
//noinspection unchecked
return (T) messages.remove(key);
return (T) (messages == null ? null : messages.remove(key));
}

@Override
Expand All @@ -80,4 +90,14 @@ public Consumer<Throwable> getOnError() {
public BiConsumer<Throwable, ExecutionContext> getOnTimeout() {
return onTimeout;
}

@SuppressWarnings("MethodDoesntCallSuperMethod")
@Override
public InMemoryExecutionContext clone() {
InMemoryExecutionContext clone = new InMemoryExecutionContext();
clone.messages = new ConcurrentHashMap<>(messages);
clone.messages.computeIfPresent(DATA_TABLES, (key, dt) ->
new ConcurrentHashMap<>(((Map<?, ?>) dt)));
return clone;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ default void beforeCycle(boolean definitelyLastCycle) {
* @param map A transformation on T
* @return A new source set if the map function results in any changes, otherwise this source set is returned.
*/
LargeSourceSet edit(UnaryOperator<SourceFile> map);
LargeSourceSet edit(UnaryOperator<@Nullable SourceFile> map);

/**
* Concatenate new items. Where possible, implementations should not iterate the entire source set in order
Expand Down
Loading

0 comments on commit 4f0f666

Please sign in to comment.