1.elasticsearch-river-kafka 插件的安装
elasticsearch-river-kafka 插件的安装与其他插件一样
cd $ELASTICSEARCH_HOME ./bin/plugin -url file:/$PLUGIN_PATH -install elasticsearch-river-kafka |
插件更新
cd $ELASTICSEARCH_HOME ./bin/plugin -remove elasticsearch-river-kafka ./bin/plugin -url file:/$PLUGIN_PATH -install elasticsearch-river-kafka |
2.river节点的配置
配置river节点的时候,river节点和非river节点都要配置。
river节点:在es的配置文件中添加下面几行
#node.river: _none_ ##这一行要注释掉,表示为river节点 threadpool: bulk:
type: fixed
size: 60
queue_size: 1000
|
非river节点:在es的配置文件中添加下面几行
node.river: _none_ ##这一行要解注,表示该节点不是river节点 threadpool: bulk:
type: fixed
size: 60
queue_size: 1000
|
注意:一般,不会将数据落在river节点上(即node.data: false),但测试环境上就无所谓了,机器资源又紧张。
节点配置完后,记得重启es,重启es的顺序:master节点→data节点→river节点
3.elasticsearch-river-kafka 插件的开发
社区中的elasticsearch-river-kafka 插件仅提供了对String和json数据的简单处理。在实际生产中,我们遇到的情况要复杂得多。
那么这个时候,我们就得自己去开发elasticsearch-river-kafka 插件实现一些附加功能。
下面就简单介绍一下开发elasticsearch-river-kafka 插件的步骤
1)KafkaRiverPlugin
该类需要继承KafkaRiverPlugin和实现AbstractPlugin,在该类中定义plugin的名称和描述
@Override public String name() {
return "river-kafka" ;
}
@Override
public String description() {
return "River Kafka Plugin" ;
}
|
2)es-plugin.properties配置文件
需要在es-plugin.properties中添加如下的定义,这样ES在启动的时候就能够通过org.elasticsearch.plugins.PluginManager
在当前的classpath中扫描到我们的plugin。
注意:定义中要写KafkaRiverPlugin类的全称,es-plugin.properties一般位于src/main/resources下
plugin=com.test.elasticsearch.plugin.river.kafka.KafkaRiverPlugin |
3)KafkaRiverModule
KafkaRiverPlugin的onModule方法:在ES加载所有的插件时,会invoke一个onModule方法。KafkaRiverModule会作为参数传进来
public void onModule(RiversModule module) {
module.registerRiver( "kafka" , KafkaRiverModule. class );
}
|
KafkaRiverModule必须继承AbstractModule 。在KafkaRiverModule中会生成一个KafkaRiver。KafkaRiver是River接口的实现。
public class KafkaRiverModule extends AbstractModule {
@Override
protected void configure() {
bind(River. class ).to(KafkaRiver. class ).asEagerSingleton();
}
} |
4)KafkaRiver
– KafkaRiver必须继承AbstractRiverComponent,并且实现River接口。
– KafkaRiver只提供两个方法:start和close。
– AbstractRiverComponent 用于initialize kafkariver的logger、river名、river的配置
– 构造函数通过@Inject注入river所需要的一切东西:RiverName, RiverSettings、logger、自定义的配置信息
(这里是BasicProperties,在BasicProperties中定义的配置参数可以在创建river的时候被指定,参见“4.kafka→river→es的数据存储”)
– 在start方法中启动了kafkariver的线程。在这个线程中,将数据从kafka中读取数据,然后将这些数据写到es中。
– kafkaConsumer用来定义从kafka中读取数据时的用户操作。
– ElasticsearchProducer用来定义将数据写入ES时的用户操作。
public class KafkaRiver extends AbstractRiverComponent implements River {
private BasicProperties properties;
private KafkaConsumer kafkaConsumer;
private ElasticsearchProducer elasticsearchProducer;
private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
private Thread riverMonitorThread;
private KafkaRiverSubMonitor kafkaRiverSubMonitor;
private Thread thread;
private ESLogger logger;
@Inject
protected KafkaRiver(RiverName riverName, RiverSettings settings, Client client) {
super (riverName, settings);
this .logger = Loggers.getLogger(getClass(), settings.globalSettings(), riverName);
properties = new BasicProperties(settings);
elasticsearchProducer = new ElasticsearchProducer(client, properties);
kafkaConsumer = new KafkaConsumer(riverName, properties, elasticsearchProducer);
}
@Override
public void start() {
//启动KafkaRiver的线程
try {
logger.info( "MHA: Starting Kafka Worker..." );
thread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "kafka_river" ).newThread(kafkaConsumer);
thread.start();
} catch (Exception ex) {
logger.error( "Unexpected Error occurred" , ex);
throw new RuntimeException(ex);
}
}
......
} |
4.kafka→river→es的数据存储
通过下面的指令,可以创建一条river,这样从kafka的baymaxtest的topic中的数据通过river就会落到es上。
注意:一个集群可以创建多个river,各river可以指定不同的topic、patition和序列化类
"type" : "kafka" ,
"kafka" : {
"topic" : "test" ,
"numOfConsumer" : "2" ,
"zk.connect" : "10.10.10.10:2181" ,
"zk.session.timeout.ms" : "50000" ,
"zk.sync.time.ms" : "200" ,
"zk.auto.commit.interval.ms" : "1000" ,
"zk.auto.commit.enable" : "true" ,
"zk.auto.offset.reset" : "smallest" ,
"zk.fetch.message.max.bytes" : "5242880" ,
"serializer" : "com.test.elasticsearch.river.kafka.serializer.AASerializer"
},
"elasticsearch" : {
"indexName" : "stringfortest" ,
"indexType" : "message1" ,
"batch_size" : "500" ,
"handling_batch_coresize" : "2" ,
"handling_batch_maximumPoolSize" : "2" ,
"handling_batch_keepAliveTime" : "600" ,
"handling_batch_queueSize" : "10" ,
"es_bulk_timeout" : "5"
}
}' |
上述指令中主要配置信息的说明:
kafka中 →
topic:kafka的topic名为test,
numOfConsumer:从kafka中读取数据的消费者个数
zk.connect:zookper的host名
serializer:对从kafka中来的数据的序列化类
elasticsearch中 →
indexName:在es中生成的index名,从该river中通过的数据会落到这个index中
indexType:index的type
es_bulk_timeout:es批量处理的timeout
上述指令会返回下面的结果
{ "_index" : "_river" ,
"_type" : "baymaxriver1" ,
"_id" : "_meta" ,
"_version" : 1 ,
"created" : true
} |
查看river的元数据:http://ip:9200/_river/rivername/_meta
删除一条river
curl -XDELETE 'http://localhost:9200/_river/rivername'
|
相关推荐
# this will create a file here: target/releases/elasticsearch-river-kafka-1.0.1-SNAPSHOT.zip PLUGIN_PATH=`pwd`/target/releases/elasticsearch-river-kafka-1.0.1-SNAPSHOT.zip 安装插件 cd $ELASTIC...
解压文件 elasticsearch-head.zip 安装插件: google ---》更多工具----》扩展程序 打开Google的扩展程序,点击加载已解压的扩展程序,选择解压elasticsearch-head文件夹即可添加插件成功
elasticsearch-river-jdbc-1.5.0.5.jar
elasticsearch-river-neo4j.zip,一个用于弹性搜索的river插件,它提供了一种为neo4j图形数据库建立索引的简单方法selasticsearch river插件
Elasticsearch-head谷歌插件,直接导入即可;目前是0.1.5_0版本,很好用; 大家可以试试。
jar包,官方版本,自测可用
jar包,官方版本,自测可用
用于hbase同步数据到es(river机制)
ETL工具kettle7.1抽取数据目前不支持elasticsearch 2.X以上版本,如果想要支持elasticsearch 6.X以上版本,必须替换elasticsearch-bulk-insert-plugin插件,该资源提供该插件的替换。 具体步骤为在spoon kettle\data-...
赠送jar包:elasticsearch-rest-client-6.8.3.jar; 赠送原API文档:elasticsearch-rest-client-6.8.3-javadoc.jar; 赠送源代码:elasticsearch-rest-client-6.8.3-sources.jar; 赠送Maven依赖信息文件:elastic...
elasticsearch-head谷歌插件,有了这个插件,就不用去es里面安装head插件了,直接就可以使用,避免了安装head可能会遇到的问题
es-sql安装插件,先将该插件上传到服务器,然后离线安装,解决的在线安装因网络资源慢导致安装失败问题
赠送jar包:elasticsearch-x-content-6.3.0.jar; 赠送原API文档:elasticsearch-x-content-6.3.0-javadoc.jar; 赠送源代码:elasticsearch-x-content-6.3.0-sources.jar; 赠送Maven依赖信息文件:elasticsearch-x...
elasticsearch-head插件,带有安装说明。
elasticsearch-6.1.2.tar.gz,elasticsearch-5.5.2.tar和head插件 linux下安装软件,下载后即可安装使用
赠送jar包:elasticsearch-rest-high-level-client-6.8.3.jar; 赠送原API文档:elasticsearch-rest-high-level-client-6.8.3-javadoc.jar; 赠送源代码:elasticsearch-rest-high-level-client-6.8.3-sources.jar;...
elasticsearch-head-5.0.0.zip
使用checkout tag: v5.1.2git checkout v5.1.2运行gradle buildPluginZip创建 directory ${path.home}/plugins/jieba复制zip 文件到分词插件cp build/distributions/elasticsearch-jieba-plugin-5.1.2.zip ${...
elasticsearch-head 用于chrome的插件,有了这个插件我们就不需要在服务器配置head了
elasticsearch-head-chrome es chrome插件 elasticsearch-head-chrome es chrome插件 elasticsearch-head-chrome es chrome插件 elasticsearch-head-chrome es chrome插件 elasticsearch-head-chrome es chrome插件 ...