java Spark 读取hbase数据
2021-04-12 09:28
标签:apr ESS rri faq val creat post collect bean 引用的jar包(maven) 方式一: 注意类中不能有其他方法自动注解方式不然报 方式二: java Spark 读取hbase数据 标签:apr ESS rri faq val creat post collect bean 原文地址:https://www.cnblogs.com/Mr-lin66/p/13355083.html
org.apache.spark.SparkException: Task not serializable
/**
* 必须序化使用
* */
@Component
public class SparkOnHbaseTest implements Serializable {
public void getHbase() {
SparkSession spark = SparkSession.builder().master("local[*]").appName("HBASEDATA")
.getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
String tableName = "users";
String FAMILY = "personal";
String COLUM_ID = "id";
String COLUM_NAME = "name";
String COLUM_PHONE = "phone";
// Hbase配置
Configuration hconf = HBaseConfiguration.create();
hconf.set("hbase.zookeeper.quorum", "192.168.0.124");
hconf.set("hbase.zookeeper.property.clientPort", "9095");
hconf.set(TableInputFormat.INPUT_TABLE, tableName);
//
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(FAMILY));
scan.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COLUM_ID));
scan.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COLUM_NAME));
scan.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COLUM_PHONE));
try {
//添加scan
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String ScanToString = Base64.encodeBytes(proto.toByteArray());
hconf.set(TableInputFormat.SCAN, ScanToString);
//读HBase数据转化成RDD
JavaPairRDD
/**
* 可以不序化使用
* */
@Component
public class HbaseGetData {
@Resource
private HbaseTemplate hbaseTemplate;
@Resource
private JavaSparkContext sc;
/**
* 可以成功获取数据
**/
public void getData_1() {
String tableName = "users";
String FAMILY = "personal";
String COLUM_ID = "id";
String COLUM_NAME = "name";
String COLUM_PHONE = "phone";
// Hbase配置
Configuration hconf = HBaseConfiguration.create(hbaseTemplate.getConfiguration());
hconf.set(TableInputFormat.INPUT_TABLE, tableName);
hconf.set(TableInputFormat.SCAN_COLUMNS, "personal:name personal:phone personal:id");
try {
//读HBase数据转化成RDD
JavaPairRDD
hbaseRDD.cache();// 对myRDD进行缓存
long count = hbaseRDD.count();
System.out.println("数据总条数:" + count);
List
System.out.println("list size---"+list.size());
for(Result result:list){
List
System.out.println(Bytes.toString(CellUtil.cloneRow(cells.get(0))));
for(Cell cell:cells){
System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}
/**
* 对hbase 的 DDL、DML操作或者使用HBaseUtils
*
* @Author wulincheng
* @Date 2020-7-13 15:14:08
* @Version 1.0
*/
public class HbaseTemplate{
private Logger log = LoggerFactory.getLogger(this.getClass());
/**
* hbase连接对象
*/
private Connection connection;
private org.apache.hadoop.conf.Configuration configuration;
public HbaseTemplate() {
}
// public HbaseTemplate(Connection connection) {
// setConnection(connection);
// }
@PostConstruct
private void init() throws IOException {
setConnection(ConnectionFactory.createConnection(configuration));
}
public HbaseTemplate(org.apache.hadoop.conf.Configuration configuration) throws IOException {
setConfiguration(configuration);
}
public Connection getConnection() {
return connection;
}
private Admin getAdmin() throws IOException {
return connection.getAdmin();
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public Configuration getConfiguration() {
return configuration;
}
public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}
}/**
* 读取hbase配置文件
* @author wulincheng
* @date 2020-7-14 12:11:18
* */
@ConfigurationProperties(prefix = "hbase")
public class HBaseProperties {
private Map
public Map
return config;
}
public void setConfig(Map
this.config = config;
}
}/**
* HBase配置类
* @author wulincheng
* @date 2020-7-14 12:11:18
* https://hbase.apache.org/book.html#faq 官网的
* http://c.biancheng.net/view/6523.html hbase用法参考
* */
@Configuration
@EnableConfigurationProperties(HBaseProperties.class)
public class HBaseConfig {
private final HBaseProperties properties;
public HBaseConfig(HBaseProperties properties) {
this.properties = properties;
}
@Bean
public HbaseTemplate hbaseTemplate() {
// Connection connection = null;
// try {
// connection = ConnectionFactory.createConnection(configuration());
// } catch (IOException e) {
// e.printStackTrace();
// }
// return new HbaseTemplate(configuration());
// new HbaseTemplate(connection);
HbaseTemplate hbaseTemplate = new HbaseTemplate();
hbaseTemplate.setConfiguration(configuration());
// hbaseTemplate.setAutoFlush(true);
return hbaseTemplate;
}
@Bean
public Admin admin() {
Admin admin = null;
try {
Connection connection = ConnectionFactory.createConnection(configuration());
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
return admin;
}
public org.apache.hadoop.conf.Configuration configuration() {
org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
Map
Set
for (String key : keySet) {
configuration.set(key, config.get(key));
}
return configuration;
}
}@Configuration
@ConfigurationProperties(prefix="spark")
public class SparkContextBean implements Serializable {
//spark的安装地址
private String sparkHome = "";
//应用的名称
private String appName = "";
//master的地址
private String master = "";
@Bean
@ConditionalOnMissingBean(SparkConf.class)
public SparkConf sparkConf() throws Exception {
SparkConf conf = new SparkConf()
.setSparkHome(sparkHome)
.setAppName(appName)
.setMaster(master);
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
return conf;
}
@Bean
@ConditionalOnMissingBean(JavaSparkContext.class)
public JavaSparkContext javaSparkContext() throws Exception {
return new JavaSparkContext(sparkConf());
}
public String getSparkHome() {
return sparkHome;
}
public void setSparkHome(String sparkHome) {
this.sparkHome = sparkHome;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public String getMaster() {
return master;
}
public void setMaster(String master) {
this.master = master;
}
}