ES的工具類
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
public class ESUtils {
public static BulkProcessor.Listener getBulkListener(){
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
}
};
return listener;
}
public static ActionListener<BulkResponse> getActionListener() {
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
return listener;
}
/**
* 獲取處理器
* @param client
* @param listener
* @param bulkActions
* @return
* @throws InterruptedException
*/
public static BulkProcessor getBulkProcessor(RestHighLevelClient client, BulkProcessor.Listener listener, int bulkActions) throws InterruptedException {
BulkProcessor bulkProcessor = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener).build();
BulkProcessor.Builder builder = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener);
builder.setBulkActions(bulkActions);
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
builder.setConcurrentRequests(0);
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
return bulkProcessor;
}
}
BulkProcessor的方式
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
/**
* 效率要比RestHighLevelClient.bulk或者client.bulkAsync快很多
*/
public class OneIdToEs {
private static final Logger LOG = LoggerFactory.getLogger(OneIdToEsUserPass.class);
public static void main(String[] args) {
if (args.length < 9) {
System.out.println("請輸入 hive的one_id表名稱 對應hive表唯一id字段 對應hive表的日期分區字段 對應hive表日期分區的值 寫入es索引名稱 es的Ip地址 es的端口號 es用戶名 es密碼 如:dwd.dwd_one_id_data_source_info_da exist_mark dt 2020-11-10 dwd_one_id_data_source_info_da 192.168.250.116 9200 zs zs123");
System.exit(1);
}
// 參數列表
//庫名稱.表名稱
String hiveTableName = args[0];
String hiveTableUserIdName = args[1];
String hiveTablePartitionName = args[2];
String hiveTablePartitionValue = args[3];
String esTableName = args[4];
//es
String esIp = args[5];
String esPort = args[6];
String esUser = args[7];
String esPass= args[8];
System.out.println("---------------------------------------------------spark start--------------------------------------------------------");
SparkSession spark = SparkSession
.builder()
.appName("ES_WGP_DATA")
.enableHiveSupport()
.getOrCreate();
String sql = "select * from " + hiveTableName +" where "+ hiveTablePartitionName +"=" +"'" +hiveTablePartitionValue+"'";
final Dataset<Row> inputData = spark.sql(sql);
LOG.warn("執行的sql語句 ---> " + sql);
LOG.warn("執行的sql語句的數據量是 ---> " + inputData.count());
inputData.show();
final String[] columns = inputData.columns();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
final Broadcast<String[]> broadcast = jsc.broadcast(columns);
inputData.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
@Override
public void call(Iterator<Row> iterator) throws Exception {
//es 寫入
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(esUser, esPass)); //es賬號密碼(默認用戶名為elastic)
RestHighLevelClient client =new RestHighLevelClient(
RestClient.builder(new HttpHost(esIp, Integer.parseInt(esPort), "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
})
);
if (null == client) {
LOG.error("es --> client is failed!");
}
final BulkProcessor.Listener listener = ESUtils.getBulkListener();
final BulkProcessor bulkProcessor = ESUtils.getBulkProcessor(client, listener, 100);
final String[] columns = broadcast.value();
int count = 0;
while (iterator.hasNext()) {
final Row row = iterator.next();
final String id = row.getAs(hiveTableUserIdName);
final XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject()
.field(hiveTableUserIdName, id);
for (String colName: columns) {
if (!colName.contentEquals(hiveTableUserIdName) ) {
Object obj = row.getAs(colName);
if (null == obj) {
obj = "-1";
}
xContentBuilder.field(colName, obj.toString());
}
}
xContentBuilder.endObject();
final UpdateRequest update = new UpdateRequest(esTableName, id).doc(xContentBuilder).upsert();
bulkProcessor.add(update);
// final IndexRequest indexRequest = new IndexRequest(esTableName).id(id).source(xContentBuilder).opType(DocWriteRequest.OpType.CREATE);
// bulkProcessor.add(indexRequest);
count ++;
if (count >= 100) {
bulkProcessor.flush();
count = 0;
}
}
bulkProcessor.flush();
bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
bulkProcessor.close();
client.close();
}
});
}
}
bulkAsync的方式
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
public class OneIdToEsUserPass {
private static final Logger LOG = LoggerFactory.getLogger(OneIdToEsUserPass.class);
public static void main(String[] args) {
if (args.length < 9) {
System.out.println("請輸入 hive的one_id表名稱 對應hive表唯一id字段 對應hive表的日期分區字段 對應hive表日期分區的值 寫入es索引名稱 es的Ip地址 es的端口號 es用戶名 es密碼 如:dwd.dwd_one_id_data_source_info_da exist_mark dt 2020-11-10 dwd_one_id_data_source_info_da 192.168.250.116 9200 zs zs123");
System.exit(1);
}
// 參數列表
//庫名稱.表名稱
String hiveTableName = args[0];
String hiveTableUserIdName = args[1];
String hiveTablePartitionName = args[2];
String hiveTablePartitionValue = args[3];
String esTableName = args[4];
//es
String esIp = args[5];
String esPort = args[6];
String esUser = args[7];
String esPass= args[8];
// try {
// final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// credentialsProvider.setCredentials(AuthScope.ANY,
// new UsernamePasswordCredentials(esUser, esPass)); //es賬號密碼(默認用戶名為elastic)
// RestHighLevelClient client =new RestHighLevelClient(
// RestClient.builder(new HttpHost(esIp, Integer.parseInt(esPort), "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
// public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
// httpClientBuilder.disableAuthCaching();
// return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// }
// })
// );
// GetIndexRequest exist=new GetIndexRequest(esTableName);
// boolean exists=client.indices().exists(exist, RequestOptions.DEFAULT);
//
// if (exists) {
// DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(esTableName);
// AcknowledgedResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
// boolean acknowledged = deleteIndexResponse.isAcknowledged();
// if (acknowledged) {
// LOG.warn("es刪除索引成功 ---> " + esTableName);
// }
// }
// client.close();
// }catch (IOException e) {
// LOG.error("es 刪除索引失敗 --> e: ", e.getMessage());
// }
System.out.println("---------------------------------------------------spark start--------------------------------------------------------");
SparkSession spark = SparkSession
.builder()
.appName("ES_WGP_DATA")
.enableHiveSupport()
.getOrCreate();
String sql = "select * from " + hiveTableName +" where "+ hiveTablePartitionName +"=" +"'" +hiveTablePartitionValue+"'";
final Dataset<Row> inputData = spark.sql(sql);
LOG.warn("執行的sql語句 ---> " + sql);
LOG.warn("執行的sql語句的數據量是 ---> " + inputData.count());
inputData.show();
final String[] columns = inputData.columns();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
final Broadcast<String[]> broadcast = jsc.broadcast(columns);
inputData.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
@Override
public void call(Iterator<Row> iterator) throws Exception {
//es 寫入
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(esUser, esPass)); //es賬號密碼(默認用戶名為elastic)
RestHighLevelClient client =new RestHighLevelClient(
RestClient.builder(new HttpHost(esIp, Integer.parseInt(esPort), "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
})
);
if (null == client) {
LOG.error("es --> client is failed!");
}
//異步調用使用
// final ActionListener<BulkResponse> listener = ESUtils.getActionListener();
final BulkRequest request = new BulkRequest();
request.timeout("3m");
final String[] columns = broadcast.value();
int count = 0;
while (iterator.hasNext()) {
final Row row = iterator.next();
final String id = row.getAs(hiveTableUserIdName);
final XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject()
.field(hiveTableUserIdName, id);
for (String colName: columns) {
if (!colName.contentEquals(hiveTableUserIdName) ) {
Object obj = row.getAs(colName);
if (null == obj) {
obj = "-1";
}
xContentBuilder.field(colName, obj.toString());
}
}
xContentBuilder.endObject();
final UpdateRequest update = new UpdateRequest(esTableName, id).doc(xContentBuilder).upsert();
request.add(update);
// final IndexRequest indexRequest = new IndexRequest(esTableName).id(id).source(xContentBuilder).opType(DocWriteRequest.OpType.CREATE);
// request.add(indexRequest);
count ++;
if (count >= 100) {
// client.bulkAsync(request, RequestOptions.DEFAULT, listener);
client.bulk(request, RequestOptions.DEFAULT);
count = 0;
}
}
client.bulk(request, RequestOptions.DEFAULT);
// client.bulkAsync(request, RequestOptions.DEFAULT, listener);
client.close();
client.close();
}
});
}
}

說明
1.使用第一種的UpdateRequest的doc是沒有顯示刪除的,使用第二種doc有顯示刪除。
2.使用UpdateRequest update = new UpdateRequest(esTableName, id).doc(xContentBuilder).upsert() 沒有生效,應該是:
BulkRequestBuilder bulkRequest = esBase.getClient().prepareBulk();
for (int i = 0; i < data.size(); i++) {
T t = data.get(i);
String json = JSONObject.toJSONString(t);
IndexRequest indexRequest = new IndexRequest(index, type, t.get_id()).source(json).parent(parentIdList.get(i));
UpdateRequest updateRequest = new UpdateRequest(index, type, t.get_id()).parent(parentIdList.get(i)).doc(json).upsert(indexRequest);
bulkRequest.add(updateRequest);
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();