一、購買Hbase1.1並設置對應資源
1.1購買hbase
hbase主要版本為2.0與1.1,這邊選擇對應hbase對應的版本為1.1
Hbase與Hbase2.0版本的區別
HBase1.1版本
1.1版本基於HBase社區1.1.2版本開發。
HBase2.0版本
2.0版本是基於社區2018年發佈的HBase2.0.0版本開發的全新版本。同樣,在此基礎上,做了大量的改進和優化,吸收了眾多阿里內部成功經驗,比社區HBase版本具有更好的穩定性和性能。
1.2確認VPC,vsWitchID
確保測試聯通性的可以方便可行,該hbase的VPCId,vsWitchID儘量與購買的獨享集成資源組的為一致的,獨享集成資源的文檔可以參考https://help.aliyun.com/document_detail/137838.html
1.3設置hbase白名單,其中DataWorks白名單如下,個人ECS也可添加
根據文檔鏈接選擇對應的DataWorks的region下的白名單進行添加https://help.aliyun.com/document_detail/137792.html
1.4查看hbase對應的版本和訪問地址
打開數據庫鏈接的按鈕,可以查看到Hbase的主版本以及Hbase的專有網絡訪問地址,以及是否開通公網訪問的方式進行連接。
二、安裝Phonix客戶端,並創建表和插入數據
2.1安裝客戶端
根據hbase的版本為1.1選擇Phonix的版本為4.12.0根據文檔https://help.aliyun.com/document_detail/53600.html 下載對應的客戶端文件ali-phoenix-4.12.0-AliHBase-1.1-0.9.tar.gz
登陸客戶端執行命令
./bin/sqlline.py 172.16.0.13,172.16.0.15,172.16.0.12:2181
創建表:
CREATE TABLE IF NOT EXISTS users_phonix
(
id INT ,
username STRING,
password STRING
) ;
插入數據:
UPSERT INTO users (id, username, password) VALUES (1, 'admin', 'Letmein');
2.2查看是否創建和插入成功
在客戶端執行命令,查看當前表與數據是否上傳成功
select * from users;
三、編寫對應代碼邏輯
3.1編寫代碼邏輯
在IDEA按照對應得Pom文件進行配置本地得開發環境,將代碼涉及到得配置信息填寫完整,進行編寫測試,這裡可以先使用Hbase得公網訪問鏈接進行測試,代碼邏輯驗證成功後可調整配置參數,具體代碼如下
package com.git.phonix
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.phoenix.spark._
/**
* 本實例適用於Phoenix 4.x版本
*/
object SparkOnPhoenix4xSparkSession {
def main(args: Array[String]): Unit = {
//HBase集群的ZK鏈接地址。
//格式為:xxx-002.hbase.rds.aliyuncs.com,xxx-001.hbase.rds.aliyuncs.com,xxx-003.hbase.rds.aliyuncs.com:2181
val zkAddress = args(0)
//Phoenix側的表名,需要在Phoenix側提前創建。Phoenix表創建可以參考:https://help.aliyun.com/document_detail/53716.html?spm=a2c4g.11186623.4.2.4e961ff0lRqHUW
val phoenixTableName = args(1)
//Spark側的表名。
val ODPSTableName = args(2)
val sparkSession = SparkSession
.builder()
.appName("SparkSQL-on-MaxCompute")
.config("spark.sql.broadcastTimeout", 20 * 60)
.config("spark.sql.crossJoin.enabled", true)
.config("odps.exec.dynamic.partition.mode", "nonstrict")
//.config("spark.master", "local[4]") // 需設置spark.master為local[N]才能直接運行,N為併發數
.config("spark.hadoop.odps.project.name", "***")
.config("spark.hadoop.odps.access.id", "***")
.config("spark.hadoop.odps.access.key", "***")
//.config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
.config("spark.hadoop.odps.end.point", "http://service.cn-beijing.maxcompute.aliyun-inc.com/api")
.config("spark.sql.catalogImplementation", "odps")
.getOrCreate()
//第一種插入方式
var df = sparkSession.read.format("org.apache.phoenix.spark").option("table", phoenixTableName).option("zkUrl",zkAddress).load()
df.show()
df.write.mode("overwrite").insertInto(ODPSTableName)
}
}
3.2對應Pom文件
pom文件中分為Spark依賴,與ali-phoenix-spark相關的依賴,由於涉及到ODPS的jar包,會在集群中引起jar衝突,所以要將ODPS的包排除掉
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<spark.version>2.3.0</spark.version>
<cupid.sdk.version>3.3.8-public</cupid.sdk.version>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<phoenix.version>4.12.0-HBase-1.1</phoenix.version>
</properties>
<groupId>com.aliyun.odps</groupId>
<artifactId>Spark-Phonix</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.jpmml</groupId>
<artifactId>pmml-model</artifactId>
<version>1.3.8</version>
</dependency>
<dependency>
<groupId>org.jpmml</groupId>
<artifactId>pmml-evaluator</artifactId>
<version>1.3.10</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>cupid-sdk</artifactId>
<version>${cupid.sdk.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-core</artifactId>
<version>4.12.0-AliHBase-1.1-0.8</version>
<exclusions>
<exclusion>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
</exclusion>
<exclusion>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-commons</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-spark</artifactId>
<version>4.12.0-AliHBase-1.1-0.8</version>
<exclusions>
<exclusion>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>false</minimizeJar>
<shadedArtifactAttached>true</shadedArtifactAttached>
<artifactSet>
<includes>
<!-- Include here the dependencies you
want to be packed in your fat jar -->
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>**/log4j.properties</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
四、打包上傳到DataWorks進行冒煙測試
4.1創建要傳入的MaxCompute表
CREATE TABLE IF NOT EXISTS users_phonix
(
id INT ,
username STRING,
password STRING
) ;
4.2打包上傳到MaxCompute
在IDEA打包要打成shaded包,將所有的依賴包,打入jar包中,由於DatadWork界面方式上傳jar包有50M的限制,因此採用MaxCompute客戶端進行jar包
4.3選擇對應的project環境,查看上傳資源,並點擊添加到數據開發
進入DataWorks界面選擇左側資源圖標,選擇對應的環境位開發換進,輸入刪除文件時的文件名稱進行搜索,列表中展示該資源已經上傳成,點擊提交到數據開發
點擊提交按鈕
4.4配置對應的vpcList參數並提交任務測試
其中的配置vpcList文件的配置信息如下,可具體根據個人hbase的鏈接,進行配置
{
"regionId":"cn-beijing",
"vpcs":[
{
"vpcId":"vpc-2ze7cqx2bqodp9ri1vvvk",
"zones":[
{
"urls":[
{
"domain":"172.16.0.12",
"port":2181
},
{
"domain":"172.16.0.13",
"port":2181
},
{
"domain":"172.16.0.15",
"port":2181
},
{
"domain":"172.16.0.14",
"port":2181
},
{
"domain":"172.16.0.12",
"port":16000
},
{
"domain":"172.16.0.13",
"port":16000
},
{
"domain":"172.16.0.15",
"port":16000
},
{
"domain":"172.16.0.14",
"port":16000
},
{
"domain":"172.16.0.12",
"port":16020
},
{
"domain":"172.16.0.13",
"port":16020
},
{
"domain":"172.16.0.15",
"port":16020
},
{
"domain":"172.16.0.14",
"port":16020
}
]
}
]
}
]
}
Spark任務提交任務的配置參數,主類,以及對應的參數
該參數主要為3個參數第一個為Phonix的鏈接,第二個為Phonix的表名稱,第三個為傳入的MaxCompute表
點擊冒煙測試按鈕,可以看到任務執行成功
在臨時查詢節點中執行查詢語句,可以得到數據已經寫入MaxCompute的表中
總結:
使用Spark on MaxCompute訪問Phonix的數據,並將數據寫入到MaxCompute的表中經過實踐,該方案時可行的。但在實踐的時有幾點注意事項:
1.結合實際使用情況選擇對應的Hbase以及Phonix版本,對應的版本一致,並且所使用的客戶端,以及代碼依賴都會有所改變。
2.使用公網在IEAD進行本地測試,要注意Hbase白名單,不僅要設置DataWorks的白名單,還需將自己本地的地址加入到白名單中。
3.代碼打包時需要將pom中的依賴關係進行梳理,避免ODPS所存在的包在對應的依賴中,進而引起jar包衝突,並且打包時打成shaded包,避免缺失遺漏對應的依賴。
歡迎加入“MaxCompute開發者社區2群”,點擊鏈接申請加入或掃描二維碼
https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745