我已经尝试过 Janino,并且可以通过将 String 传递给 janino 编译器来执行动态类。
现在,我应该动态创建几个类,然后将它们导入另一个动态类。
一个例子:
- 从 String 编译类 A
- 从 String 编译类 B
- 创建类 C,放入“import mypackage.A; import mypackage.B; etc...”
- 编译类 C
我怎样才能达到这个结果?
我的第一个动态课程是:
package com.Flink.POJO;
public class RainPOJO {
private String Altitude;
private String City_name;
private String Latitude;
private String Longitude;
private String Rainfall;
private String Station_name;
private String Time;
public String getAltitude() {
return Altitude;
}
public void setAltitude(String Altitude) {
this.Altitude = Altitude;
}
public String getCity_name() {
return City_name;
}
public void setCity_name(String City_name) {
this.City_name = City_name;
}
public String getLatitude() {
return Latitude;
}
public void setLatitude(String Latitude) {
this.Latitude = Latitude;
}
public String getLongitude() {
return Longitude;
}
public void setLongitude(String Longitude) {
this.Longitude = Longitude;
}
public String getRainfall() {
return Rainfall;
}
public void setRainfall(String Rainfall) {
this.Rainfall = Rainfall;
}
public String getStation_name() {
return Station_name;
}
public void setStation_name(String Station_name) {
this.Station_name = Station_name;
}
public String getTime() {
return Time;
}
public void setTime(String Time) {
this.Time = Time;
}
}
然后我必须从以下动态生成的类中调用该类:
import org.apache.flink.api.common.functions.MapFunction;
import java.util.ArrayList;
import com.google.gson.JsonObject;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import com.Flink.POJO.RainPOJO;
public class FlinkExecutor {
public FlinkExecutor() {}
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
Source Rain = new Source("sensor", "rain");
String path_Rain = Rain.getCSVPath();
DataSet < RainPOJO > ds_s1 = env.readCsvFile("file://" + path_Rain)
.ignoreFirstLine()
.pojoType(RainPOJO.class, "table", "time", "longitude", "latitude", "average_rainfall", "maximum_rainfall");
ds_s1.map(new MapRain(ds_s1.count()))
.print();
}
}
编译我使用的第一个类:
SimpleCompiler compiler = new SimpleCompiler();
compiler.cook(p_class);
ClassLoader classloader = compiler.getClassLoader();
try {
Class<?> cl = classloader.loadClass("com.Flink.POJO.RainPOJO");
} catch (ClassNotFoundException e1) {
e1.printStackTrace();
}
而对于第二个类,它包含主要方法:
SimpleCompiler compiler = new SimpleCompiler();
compiler.cook(this.allClass);
try {
Class<?> cl = compiler.getClassLoader().loadClass("FlinkExecutor");
Method mainMeth = cl.getMethod("main", new Class[] { String[].class });
String[] methArgs = new String[] { "" }; // one input
mainMeth.invoke(null, new Object[] { methArgs });
} catch (ClassNotFoundException | NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException e) {
e.printStackTrace();
}
我得到的错误是:
org.codehaus.commons.compiler.CompileException: Line 8, Column 7: A class 'com.Flink.POJO.RainPOJO' could not be found
Janino 找不到类导入声明。
谢谢你,贾科莫