大數據

Spark通過RestHighLevelClient批量寫入ES7

ES的工具類

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

public class ESUtils {
    public static BulkProcessor.Listener getBulkListener(){
        BulkProcessor.Listener listener =  new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {

            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {

            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

            }
        };
        return listener;
    }

    public static ActionListener<BulkResponse> getActionListener() {
        ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
            @Override
            public void onResponse(BulkResponse bulkResponse) {

            }

            @Override
            public void onFailure(Exception e) {

            }
        };
        return listener;
    }


    /**
     * 獲取處理器
     * @param client
     * @param listener
     * @param bulkActions
     * @return
     * @throws InterruptedException
     */
    public static BulkProcessor getBulkProcessor(RestHighLevelClient client, BulkProcessor.Listener listener, int bulkActions) throws InterruptedException {
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                (request, bulkListener) ->
                        client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                listener).build();
        BulkProcessor.Builder builder = BulkProcessor.builder(
                (request, bulkListener) ->
                        client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                listener);
        builder.setBulkActions(bulkActions);
        builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
        builder.setConcurrentRequests(0);
        builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
        builder.setBackoffPolicy(BackoffPolicy
                .constantBackoff(TimeValue.timeValueSeconds(1L), 3));
        return bulkProcessor;
    }
}

BulkProcessor的方式

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;

/**
 * 效率要比RestHighLevelClient.bulk或者client.bulkAsync快很多
 */
public class OneIdToEs {
    private static final Logger LOG = LoggerFactory.getLogger(OneIdToEsUserPass.class);
    public static void main(String[] args) {
        if (args.length < 9) {
            System.out.println("請輸入 hive的one_id表名稱 對應hive表唯一id字段 對應hive表的日期分區字段 對應hive表日期分區的值 寫入es索引名稱 es的Ip地址 es的端口號 es用戶名 es密碼 如:dwd.dwd_one_id_data_source_info_da exist_mark dt 2020-11-10 dwd_one_id_data_source_info_da 192.168.250.116 9200 zs zs123");
            System.exit(1);
        }

//        參數列表
        //庫名稱.表名稱
        String hiveTableName = args[0];
        String hiveTableUserIdName = args[1];
        String hiveTablePartitionName = args[2];
        String hiveTablePartitionValue = args[3];
        String esTableName = args[4];

        //es
        String esIp = args[5];
        String esPort = args[6];
        String esUser = args[7];
        String esPass= args[8];

        System.out.println("---------------------------------------------------spark start--------------------------------------------------------");
        SparkSession spark = SparkSession
                .builder()
                .appName("ES_WGP_DATA")
                .enableHiveSupport()
                .getOrCreate();

        String sql = "select * from " + hiveTableName +" where "+ hiveTablePartitionName +"=" +"'" +hiveTablePartitionValue+"'";
        final Dataset<Row> inputData = spark.sql(sql);
        LOG.warn("執行的sql語句 ---> " + sql);
        LOG.warn("執行的sql語句的數據量是 ---> " + inputData.count());

        inputData.show();

        final String[] columns = inputData.columns();
        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
        final Broadcast<String[]> broadcast = jsc.broadcast(columns);

        inputData.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
            @Override
            public void call(Iterator<Row> iterator) throws Exception {
                //es 寫入
                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY,
                        new UsernamePasswordCredentials(esUser, esPass));  //es賬號密碼(默認用戶名為elastic)
                RestHighLevelClient client =new RestHighLevelClient(
                        RestClient.builder(new HttpHost(esIp, Integer.parseInt(esPort), "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                                httpClientBuilder.disableAuthCaching();
                                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                            }
                        })
                );

                if (null == client) {
                    LOG.error("es -->  client is failed!");
                }

                final BulkProcessor.Listener listener = ESUtils.getBulkListener();
                final BulkProcessor bulkProcessor = ESUtils.getBulkProcessor(client, listener, 100);

                final String[] columns = broadcast.value();
                int count = 0;
                while (iterator.hasNext()) {
                    final Row row = iterator.next();
                    final String id = row.getAs(hiveTableUserIdName);
                    final XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject()
                            .field(hiveTableUserIdName, id);
                    for (String colName: columns) {
                        if (!colName.contentEquals(hiveTableUserIdName) ) {
                            Object obj = row.getAs(colName);
                            if (null == obj) {
                                obj = "-1";
                            }
                            xContentBuilder.field(colName, obj.toString());
                        }
                    }
                    xContentBuilder.endObject();

                    final UpdateRequest update = new UpdateRequest(esTableName, id).doc(xContentBuilder).upsert();
                    bulkProcessor.add(update);
//                    final IndexRequest indexRequest = new IndexRequest(esTableName).id(id).source(xContentBuilder).opType(DocWriteRequest.OpType.CREATE);
//                    bulkProcessor.add(indexRequest);
                    count ++;
                    if (count >= 100) {
                        bulkProcessor.flush();
                        count = 0;
                    }
                }
                bulkProcessor.flush();
                bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
                bulkProcessor.close();
                client.close();
            }
        });
    }
}

bulkAsync的方式

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;

public class OneIdToEsUserPass {
    private static final Logger LOG = LoggerFactory.getLogger(OneIdToEsUserPass.class);
    public static void main(String[] args) {
        if (args.length < 9) {
            System.out.println("請輸入 hive的one_id表名稱 對應hive表唯一id字段 對應hive表的日期分區字段 對應hive表日期分區的值 寫入es索引名稱 es的Ip地址 es的端口號 es用戶名 es密碼 如:dwd.dwd_one_id_data_source_info_da exist_mark dt 2020-11-10 dwd_one_id_data_source_info_da 192.168.250.116 9200 zs zs123");
            System.exit(1);
        }

//        參數列表
        //庫名稱.表名稱
        String hiveTableName = args[0];
        String hiveTableUserIdName = args[1];
        String hiveTablePartitionName = args[2];
        String hiveTablePartitionValue = args[3];
        String esTableName = args[4];

        //es
        String esIp = args[5];
        String esPort = args[6];
        String esUser = args[7];
        String esPass= args[8];

//        try {
//            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//            credentialsProvider.setCredentials(AuthScope.ANY,
//                    new UsernamePasswordCredentials(esUser, esPass));  //es賬號密碼(默認用戶名為elastic)
//            RestHighLevelClient client =new RestHighLevelClient(
//                    RestClient.builder(new HttpHost(esIp, Integer.parseInt(esPort), "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
//                        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
//                            httpClientBuilder.disableAuthCaching();
//                            return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//                        }
//                    })
//            );
//            GetIndexRequest exist=new GetIndexRequest(esTableName);
//            boolean exists=client.indices().exists(exist, RequestOptions.DEFAULT);
//
//            if (exists) {
//                DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(esTableName);
//                AcknowledgedResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
//                boolean acknowledged = deleteIndexResponse.isAcknowledged();
//                if (acknowledged) {
//                    LOG.warn("es刪除索引成功 ---> " + esTableName);
//                }
//            }
//            client.close();
//        }catch (IOException e) {
//            LOG.error("es 刪除索引失敗 -->  e: ", e.getMessage());
//        }

        System.out.println("---------------------------------------------------spark start--------------------------------------------------------");
        SparkSession spark = SparkSession
                .builder()
                .appName("ES_WGP_DATA")
                .enableHiveSupport()
                .getOrCreate();

        String sql = "select * from " + hiveTableName +" where "+ hiveTablePartitionName +"=" +"'" +hiveTablePartitionValue+"'";
        final Dataset<Row> inputData = spark.sql(sql);
        LOG.warn("執行的sql語句 ---> " + sql);
        LOG.warn("執行的sql語句的數據量是 ---> " + inputData.count());

        inputData.show();

        final String[] columns = inputData.columns();
        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
        final Broadcast<String[]> broadcast = jsc.broadcast(columns);

        inputData.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
            @Override
            public void call(Iterator<Row> iterator) throws Exception {
                //es 寫入
                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY,
                        new UsernamePasswordCredentials(esUser, esPass));  //es賬號密碼(默認用戶名為elastic)
                RestHighLevelClient client =new RestHighLevelClient(
                        RestClient.builder(new HttpHost(esIp, Integer.parseInt(esPort), "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                                httpClientBuilder.disableAuthCaching();
                                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                            }
                        })
                );

                if (null == client) {
                    LOG.error("es -->  client is failed!");
                }

                //異步調用使用
//                final ActionListener<BulkResponse> listener = ESUtils.getActionListener();
                final BulkRequest request = new BulkRequest();
                request.timeout("3m");

                final String[] columns = broadcast.value();
                int count = 0;
                while (iterator.hasNext()) {
                    final Row row = iterator.next();
                    final String id = row.getAs(hiveTableUserIdName);
                    final XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject()
                            .field(hiveTableUserIdName, id);
                    for (String colName: columns) {
                        if (!colName.contentEquals(hiveTableUserIdName) ) {
                            Object obj = row.getAs(colName);
                            if (null == obj) {
                                obj = "-1";
                            }
                            xContentBuilder.field(colName, obj.toString());
                        }
                    }
                    xContentBuilder.endObject();

                   final UpdateRequest update = new UpdateRequest(esTableName, id).doc(xContentBuilder).upsert();
                   request.add(update);
//                    final IndexRequest indexRequest = new IndexRequest(esTableName).id(id).source(xContentBuilder).opType(DocWriteRequest.OpType.CREATE);
//                    request.add(indexRequest);
                    count ++;
                    if (count >= 100) {
//                        client.bulkAsync(request, RequestOptions.DEFAULT, listener);
                        client.bulk(request, RequestOptions.DEFAULT);
                        count = 0;
                    }
                }
                client.bulk(request, RequestOptions.DEFAULT);
//                client.bulkAsync(request, RequestOptions.DEFAULT, listener);
                client.close();
                client.close();
            }
        });
    }
}

es寫入.png

說明

  1.使用第一種的UpdateRequest的doc是沒有顯示刪除的,使用第二種doc有顯示刪除。
  2.使用UpdateRequest update = new UpdateRequest(esTableName, id).doc(xContentBuilder).upsert() 沒有生效,應該是:
BulkRequestBuilder bulkRequest = esBase.getClient().prepareBulk();
        for (int i = 0; i < data.size(); i++) {
            T t = data.get(i);
            String json = JSONObject.toJSONString(t);
            IndexRequest indexRequest = new IndexRequest(index, type, t.get_id()).source(json).parent(parentIdList.get(i));
            UpdateRequest updateRequest = new UpdateRequest(index, type, t.get_id()).parent(parentIdList.get(i)).doc(json).upsert(indexRequest);

            bulkRequest.add(updateRequest);
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();

Leave a Reply

Your email address will not be published. Required fields are marked *