@@ -266,7 +266,7 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrl
266
266
*/
267
267
public static Set <URL > registerTable (SqlTree sqlTree , StreamExecutionEnvironment env , StreamTableEnvironment tableEnv , String localSqlPluginPath ,
268
268
String remoteSqlPluginPath , String pluginLoadMode , Map <String , AbstractSideTableInfo > sideTableMap , Map <String , Table > registerTableCache ) throws Exception {
269
- Set <URL > pluginClassPatshSets = Sets .newHashSet ();
269
+ Set <URL > pluginClassPathSets = Sets .newHashSet ();
270
270
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner ();
271
271
for (AbstractTableInfo tableInfo : sqlTree .getTableInfoMap ().values ()) {
272
272
@@ -302,26 +302,26 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
302
302
registerTableCache .put (tableInfo .getName (), regTable );
303
303
304
304
URL sourceTablePathUrl = PluginUtil .buildSourceAndSinkPathByLoadMode (tableInfo .getType (), AbstractSourceTableInfo .SOURCE_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
305
- pluginClassPatshSets .add (sourceTablePathUrl );
305
+ pluginClassPathSets .add (sourceTablePathUrl );
306
306
} else if (tableInfo instanceof AbstractTargetTableInfo ) {
307
307
308
308
TableSink tableSink = StreamSinkFactory .getTableSink ((AbstractTargetTableInfo ) tableInfo , localSqlPluginPath );
309
309
TypeInformation [] flinkTypes = FunctionManager .transformTypes (tableInfo .getFieldClasses ());
310
310
tableEnv .registerTableSink (tableInfo .getName (), tableInfo .getFields (), flinkTypes , tableSink );
311
311
312
312
URL sinkTablePathUrl = PluginUtil .buildSourceAndSinkPathByLoadMode (tableInfo .getType (), AbstractTargetTableInfo .TARGET_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
313
- pluginClassPatshSets .add (sinkTablePathUrl );
313
+ pluginClassPathSets .add (sinkTablePathUrl );
314
314
} else if (tableInfo instanceof AbstractSideTableInfo ) {
315
315
String sideOperator = ECacheType .ALL .name ().equals (((AbstractSideTableInfo ) tableInfo ).getCacheType ()) ? "all" : "async" ;
316
316
sideTableMap .put (tableInfo .getName (), (AbstractSideTableInfo ) tableInfo );
317
317
318
318
URL sideTablePathUrl = PluginUtil .buildSidePathByLoadMode (tableInfo .getType (), sideOperator , AbstractSideTableInfo .TARGET_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
319
- pluginClassPatshSets .add (sideTablePathUrl );
319
+ pluginClassPathSets .add (sideTablePathUrl );
320
320
} else {
321
321
throw new RuntimeException ("not support table type:" + tableInfo .getType ());
322
322
}
323
323
}
324
- return pluginClassPatshSets ;
324
+ return pluginClassPathSets ;
325
325
}
326
326
327
327
/**
0 commit comments