SparkSession配置获取客户端
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
public class SparkTool implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkTool.class);
public static String appName ="root";
private static JavaSparkContext jsc = null;
private static SparkSession spark = null;
private static void initSpark() {
if (jsc == null || spark == null) {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.driver.allowMultipleContexts", "true");
sparkConf.set("spark.eventLog.enabled", "true");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.hadoop.validateOutputSpecs", "false");
sparkConf.set("hive.mapred.supports.subdirectories", "true");
sparkConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
spark = SparkSession.builder().appName(appName).config(sparkConf).enableHiveSupport().getOrCreate();
jsc = new JavaSparkContext(spark.sparkContext());
}
}
public static JavaSparkContext getJsc() {
if (jsc == null) {
initSpark();
}
return jsc;
}
public static SparkSession getSession() {
if (spark == null ) {
initSpark();
}
return spark;
}
}
通过sparkSession执行sql
public List<TableInfo> selectTableInfoFromSpark(String abstractSql){
List<TableInfo> tableInfoList = new ArrayList<TableInfo>();
TableInfo tableInfo = new TableInfo();
SparkSession spark = SparkTool.getSession();
Dataset<Row> dataset = spark.sql(abstractSql);
List<Row> rowList = dataset.collectAsList();
for(Row row : rowList){
tableInfo.setColumnName(row.getString(1));
tableInfo.setColumnType(row.getString(2));
tableInfo.setColumnComment(row.getString(3));
tableInfoList.add(tableInfo);
}
return tableInfoList;
}
java 或者scala操作spark-sql时查询出来的数据有RDD、DataFrame、DataSet三种。
这三种数据结构关系以及转换或者解析见博客:https://www.jianshu.com/p/71003b152a84