利用Mapreduce/hive查询Phoenix数据时如何划分partition?PhoenixInputFormat
的源码一看便知:
???public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { ???????Configuration configuration = context.getConfiguration(); ???????QueryPlan queryPlan = this.getQueryPlan(context, configuration); ???????List allSplits = queryPlan.getSplits(); ???????List splits = this.generateSplits(queryPlan, allSplits); ???????return splits; ???}
根据select查询语句创建查询计划,QueryPlan,实际是子类ScanPlan。getQueryPlan
函数有一个特殊操作:queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
如果HBase表有多个Region,则会将一个Scan
划分为多个,每个Region对应一个Split。这个逻辑跟MR on HBase类似。只是这边的实现过程不同,这边调用的是Phoenix的QueryPlan,而不是HBase API。
以下是一个示例,加深这一过程的理解。
Phoenix 建表
将表presplit为4个region:[-∞,CS), [CS, EU), [EU, NA), [NA, +∞)
CREATE TABLE TEST (HOST VARCHAR NOT NULL PRIMARY KEY, DESCRIPTION VARCHAR) SPLIT ON ('CS','EU','NA');upsert into test(host, description) values ('CS11', 'cccccccc');upsert into test(host, description) values ('EU11', 'eeeddddddddd')upsert into test(host, description) values ('NA11', 'nnnnneeeddddddddd');
0: jdbc:phoenix:localhost> select * from test;+-------+--------------------+| HOST ?| ???DESCRIPTION ????|+-------+--------------------+| CS11 ?| cccccccc ??????????|| EU11 ?| eeeddddddddd ??????|| NA11 ?| nnnnneeeddddddddd ?|+-------+--------------------+
窥探ScanPlan
import org.apache.hadoop.hbase.client.Scan;import org.apache.log4j.BasicConfigurator;import org.apache.phoenix.compile.QueryPlan;import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;import org.apache.phoenix.jdbc.PhoenixStatement;import java.io.IOException;import java.sql.*;import java.util.List;public class LocalPhoenix { ???public static void main(String[] args) throws SQLException, IOException { ???????BasicConfigurator.configure(); ???????Statement stmt = null; ???????ResultSet rs = null; ???????Connection con = DriverManager.getConnection("jdbc:phoenix:localhost:2181:/hbase"); ???????stmt = con.createStatement(); ???????PhoenixStatement pstmt = (PhoenixStatement)stmt; ???????QueryPlan queryPlan = pstmt.optimizeQuery("select * from TEST"); ???????queryPlan.iterator(MapReduceParallelScanGrouper.getInstance()); ???????Scan scan = queryPlan.getContext().getScan(); ???????List<List<Scan>> scans = queryPlan.getScans(); ???????for (List<Scan> sl : scans) { ???????????System.out.println(); ???????????for (Scan s : sl) { ???????????????System.out.print(s); ???????????} ???????} ???????con.close(); ???}}
4个scan如下:
{"loadColumnFamiliesOnDemand":null,"startRow":"","stopRow":"CS","batch":-1,"cacheBlocks":true,"totalColumns":1,"maxResultSize":-1,"families":{"0":["ALL"]},"caching":100,"maxVersions":1,"timeRange":[0,1523338217847]}{"loadColumnFamiliesOnDemand":null,"startRow":"CS","stopRow":"EU","batch":-1,"cacheBlocks":true,"totalColumns":1,"maxResultSize":-1,"families":{"0":["ALL"]},"caching":100,"maxVersions":1,"timeRange":[0,1523338217847]}{"loadColumnFamiliesOnDemand":null,"startRow":"EU","stopRow":"NA","batch":-1,"cacheBlocks":true,"totalColumns":1,"maxResultSize":-1,"families":{"0":["ALL"]},"caching":100,"maxVersions":1,"timeRange":[0,1523338217847]}{"loadColumnFamiliesOnDemand":null,"startRow":"NA","stopRow":"","batch":-1,"cacheBlocks":true,"totalColumns":1,"maxResultSize":-1,"families":{"0":["ALL"]},"caching":100,"maxVersions":1,"timeRange":[0,1523338217847]}Disconnected from the target VM, address: '127.0.0.1:63406', transport: 'socket'
Mapreduce atop Apache Phoenix (ScanPlan 初探)
原文地址:https://www.cnblogs.com/luweiseu/p/8783253.html