澳门新葡萄京官网注册 9

澳门新葡萄京官网注册基于Java、Kafka、ElasticSearch的搜索框架的设计与实现

Jkes是一个基于Java、Kafka、ElasticSearch的搜索框架。Jkes提供了注解驱动的JPA风格的对象/文档映射,使用REST
API用于文档搜索。

ElasticSearch是一个高度可扩展的开源搜索引擎并使用REST
API,所以您值得拥有。
在本教程中,将介绍开始使用ElasticSearch的一些主要概念。

项目主页

下载并运行ElasticSearch

ElasticSearch可以从elasticsearch.org下载对应的文件格式,如ZIPTAR.GZ。下载并提取一个运行它的软件包之后不会容易得多,需要提前安装Java运行时环境。

安装

可以参考jkes-integration-test项目快速掌握jkes框架的使用方法。jkes-integration-test是我们用来测试功能完整性的一个Spring
Boot Application。

  • 安装jkes-index-connectorjkes-delete-connector到Kafka
    Connect类路径
  • 安装 Smart Chinese Analysis Plugin

    sudo bin/elasticsearch-plugin install analysis-smartcn

在Windows上运行ElasticSearch

在本文章中,所使用的环境是Windows,所以这里只介绍在Windows上运行ElasticSearch,可从命令窗口运行位于bin文件夹中的elasticsearch.bat。这将会启动ElasticSearch在控制台的前台运行,这意味着我们可在控制台中看到运行信息或一些错误信息,并可以使用**CTRL

  • C**停止或关闭它。

当前版本是: elasticsearch-5.2.0
下载链接:
http://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.2.0.zip

把下载好的文件 elasticsearch-5.2.0.zip 解压到
D:softwareelasticsearch-5.2.0,其目录结构如下所示 –

澳门新葡萄京官网注册 1

启动 ElasticSearch

Microsoft Windows [版本 10.0.10586]
(c) 2015 Microsoft Corporation。保留所有权利。

C:UsersAdministrator>d:

D:>cd softwareelasticsearch-5.2.0

D:softwareelasticsearch-5.2.0>cd bin

D:softwareelasticsearch-5.2.0bin>elasticsearch.bat
[2017-01-28T14:10:32,177][INFO ][o.e.n.Node               ] [] initializing ...
[2017-01-28T14:10:32,670][INFO ][o.e.e.NodeEnvironment    ] [SnafGWM] using [1] data paths, mounts [[Software (D:)]], net usable_space [61.6gb], net total_space [139gb], spins? [unknown], types [NTFS]
[2017-01-28T14:10:32,686][INFO ][o.e.e.NodeEnvironment    ] [SnafGWM] heap size [1.9gb], compressed ordinary object pointers [true]
[2017-01-28T14:10:32,686][INFO ][o.e.n.Node               ] node name [SnafGWM] derived from node ID [SnafGWMWRzmfwTKP6VJClA]; set [node.name] to override
[2017-01-28T14:10:32,717][INFO ][o.e.n.Node               ] version[5.2.0], pid[9724], build[24e05b9/2017-01-24T19:52:35.800Z], OS[Windows 10/10.0/amd64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_65/25.65-b01]
[2017-01-28T14:10:35,271][INFO ][o.e.p.PluginsService     ] [SnafGWM] loaded module [aggs-matrix-stats]
[2017-01-28T14:10:35,271][INFO ][o.e.p.PluginsService     ] [SnafGWM] loaded module [ingest-common]
[2017-01-28T14:10:35,271][INFO ][o.e.p.PluginsService     ] [SnafGWM] loaded module [lang-expression]
[2017-01-28T14:10:35,271][INFO ][o.e.p.PluginsService     ] [SnafGWM] loaded module [lang-groovy]
[2017-01-28T14:10:35,271][INFO ][o.e.p.PluginsService     ] [SnafGWM] loaded module [lang-mustache]
[2017-01-28T14:10:35,287][INFO ][o.e.p.PluginsService     ] [SnafGWM] loaded module [lang-painless]
[2017-01-28T14:10:35,287][INFO ][o.e.p.PluginsService     ] [SnafGWM] loaded module [percolator]
[2017-01-28T14:10:35,288][INFO ][o.e.p.PluginsService     ] [SnafGWM] loaded module [reindex]
[2017-01-28T14:10:35,290][INFO ][o.e.p.PluginsService     ] [SnafGWM] loaded module [transport-netty3]
[2017-01-28T14:10:35,291][INFO ][o.e.p.PluginsService     ] [SnafGWM] loaded module [transport-netty4]
[2017-01-28T14:10:35,292][INFO ][o.e.p.PluginsService     ] [SnafGWM] no plugins loaded
[2017-01-28T14:10:41,394][INFO ][o.e.n.Node               ] initialized
[2017-01-28T14:10:41,397][INFO ][o.e.n.Node               ] [SnafGWM] starting ...
[2017-01-28T14:10:42,657][INFO ][o.e.t.TransportService   ] [SnafGWM] publish_address {127.0.0.1:9300}, bound_addresses {127.0.0.1:9300}, {[::1]:9300}
[2017-01-28T14:10:46,439][INFO ][o.e.c.s.ClusterService   ] [SnafGWM] new_master {SnafGWM}{SnafGWMWRzmfwTKP6VJClA}{vG5mFSENST6eo-yl_O8HuA}{127.0.0.1}{127.0.0.1:9300}, reason: zen-disco-elected-as-master ([0] nodes joined)
[2017-01-28T14:10:48,628][INFO ][o.e.h.HttpServer         ] [SnafGWM] publish_address {127.0.0.1:9200}, bound_addresses {127.0.0.1:9200}, {[::1]:9200}
[2017-01-28T14:10:48,628][INFO ][o.e.n.Node               ] [SnafGWM] started
[2017-01-28T14:10:48,928][INFO ][o.e.g.GatewayService     ] [SnafGWM] recovered [0] indices into cluster_state

在启动过程中,ElasticSearch的实例运行会占用大量的内存,所以在这一过程中,电脑会变得比较慢,需要耐心等待,启动加载完成后电脑就可以正常使用了。

如果您没有安装Java运行时或没有正确配置,应该不会看到像上面的输出,而是一个消息说“JAVA_HOME环境变量必须设置!
要解决这个问题,首先下载并安装Java,其次,确保已正确配置JAVA_HOME环境变量(或参考

  • Java
    JDK安装和配置)。

配置

  • 引入jkes-spring-data-jpa依赖
  • 添加配置

@EnableAspectJAutoProxy
@EnableJkes
@Configuration
public class JkesConfig {

  @Bean
  public PlatformTransactionManager transactionManager(EntityManagerFactory factory, EventSupport eventSupport) {

    return new SearchPlatformTransactionManager(new JpaTransactionManager(factory), eventSupport);
  }
}

提供JkesProperties Bean

@Component
@Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {

    @PostConstruct
    public void setUp() {
        Config.setJkesProperties(this);
    }

    @Override
    public String getKafkaBootstrapServers() {
        return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292";
    }

    @Override
    public String getKafkaConnectServers() {
        return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084";
    }

    @Override
    public String getEsBootstrapServers() {
        return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200";
    }

    @Override
    public String getDocumentBasePackage() {
        return "com.timeyang.jkes.integration_test.domain";
    }

    @Override
    public String getClientId() {
        return "integration_test";
    }

}

这里可以很灵活,如果使用Spring
Boot,可以使用@ConfigurationProperties提供配置

增加索引管理端点
因为我们不知道客户端使用的哪种web技术,所以索引端点需要在客户端添加。比如在Spring MVC中,可以按照如下方式添加索引端点

@RestController
@RequestMapping("/api/search")
public class SearchEndpoint {

    private Indexer indexer;

    @Autowired
    public SearchEndpoint(Indexer indexer) {
        this.indexer = indexer;
    }

    @RequestMapping(value = "/start_all", method = RequestMethod.POST)
    public void startAll() {
        indexer.startAll();
    }

    @RequestMapping(value = "/start/{entityClassName:.+}", method = RequestMethod.POST)
    public void start(@PathVariable("entityClassName") String entityClassName) {
        indexer.start(entityClassName);
    }

    @RequestMapping(value = "/stop_all", method = RequestMethod.PUT)
    public Map<String, Boolean> stopAll() {
        return indexer.stopAll();
    }

    @RequestMapping(value = "/stop/{entityClassName:.+}", method = RequestMethod.PUT)
    public Boolean stop(@PathVariable("entityClassName") String entityClassName) {
        return indexer.stop(entityClassName);
    }

    @RequestMapping(value = "/progress", method = RequestMethod.GET)
    public Map<String, IndexProgress> getProgress() {
        return indexer.getProgress();
    }

}

使用REST API与Sense

ElasticSearch的实例并运行,您可以使用localhost:9200,基于JSON的REST
API与ElasticSearch进行通信。使用任何HTTP客户端来通信。在ElasticSearch自己的文档中,所有示例都使用curl。
但是,当使用API时也可使用图形客户端(如Fiddler或RESTClient),这样操作起更方便直观一些。

更方便的是Chrome插件Sense。
Sense提供了一个专门用于使用ElasticSearch的REST API的简单用户界面。
它还具有许多方便的功能,例如:ElasticSearch的查询语法的自动完成功能以及curl格式的复制和粘贴请求,从而可以方便地在文档中运行示例。

我们将在本教程中使用sense来执行curl请求,建议安装Sense并使用它学习后续文章内容。

安装完成后,在Chrome的右上角找到Sense的图标。
第一次单击它运行Sense时,会为您准备一个非常简单的示例请求。如下图所示 –

澳门新葡萄京官网注册 2

快速开始

上述请求将执行最简单的搜索查询,匹配服务器上所有索引中的所有文档。针对ElasticSearch运行,Sense提供的最简单的查询,在响应结果的数据中并没有查询到任何数据,因为没有任何索引。如下所示

{
   "took": 1,
   "timed_out": false,
   "_shards": {
      "total": 0,
      "successful": 0,
      "failed": 0
   },
   "hits": {
      "total": 0,
      "max_score": 0,
      "hits": []
   }
}

下一步我们来学习添加一些数据和索引,来修复这个问题。

索引API

使用com.timeyang.jkes.core.annotation包下相关注解标记实体

@lombok.Data
@Entity
@Document
public class Person extends AuditedEntity {

    // @Id will be identified automatically
    // @Field(type = FieldType.Long)
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @MultiFields(
            mainField = @Field(type = FieldType.Text),
            otherFields = {
                    @InnerField(suffix = "raw", type = FieldType.Keyword),
                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")
            }
    )
    private String name;

    @Field(type = FieldType.Keyword)
    private String gender;

    @Field(type = FieldType.Integer)
    private Integer age;

    // don't add @Field to test whether ignored
    // @Field(type = FieldType.Text)
    private String description;

    @Field(type = FieldType.Object)
    @ManyToOne(fetch = FetchType.EAGER)
    @JoinColumn(name = "group_id")
    private PersonGroup personGroup;

}

@lombok.Data
@Entity
@Document(type = "person_group", alias = "person_group_alias")
public class PersonGroup extends AuditedEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String name;
    private String interests;
    @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "personGroup", orphanRemoval = true)
    private List<Person> persons;
    private String description;

    @DocumentId
    @Field(type = FieldType.Long)
    public Long getId() {
        return id;
    }

    @MultiFields(
            mainField = @Field(type = FieldType.Text),
            otherFields = {
                    @InnerField(suffix = "raw", type = FieldType.Keyword),
                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")
            }
    )
    public String getName() {
        return name;
    }

    @Field(type = FieldType.Text)
    public String getInterests() {
        return interests;
    }

    @Field(type = FieldType.Nested)
    public List<Person> getPersons() {
        return persons;
    }

    /**
     * 不加Field注解,测试序列化时是否忽略
     */
    public String getDescription() {
        return description;
    }
}

当更新实体时,文档会被自动索引到ElasticSearch;删除实体时,文档会自动从ElasticSearch删除。

文档管理(CRUD)

想要使用ElasticSearch,用于搜索第一步就是使用一些数据填充来索引,CRUD表“创建”或者“索引”。我们还将学习如何更新,读取和删除文档。

搜索API

启动搜索服务jkes-search-service,搜索服务是一个Spring
Boot Application,提供rest搜索api,默认运行在9000端口。

URI query

curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10

Nested query

integration_test_person_group/person_group/_search?from=0&size=10
{
  "query": {
    "nested": {
      "path": "persons",
      "score_mode": "avg",
      "query": {
        "bool": {
          "must": [
            {
              "range": {
                "persons.age": {
                  "gt": 5
                }
              }
            }
          ]
        }
      }
    }
  }
}

match query

integration_test_person_group/person_group/_search?from=0&size=10
{
  "query": {
      "match": {
        "interests": "Hadoop"
      }
    }
}

bool query

{
  "query": {
    "bool" : {
      "must" : {
        "match" : { "interests" : "Hadoop" }
      },
      "filter": {
        "term" : { "name.raw" : "name0" }
      },
      "should" : [
        { "match" : { "interests" : "Flink" } },
        {
            "nested" : {
                "path" : "persons",
                "score_mode" : "avg",

                "query" : {
                    "bool" : {
                        "must" : [
                        { "match" : {"persons.name" : "name40"} },
                        { "match" : {"persons.interests" : "interests"} }
                        ],
                        "must_not" : {
                            "range" : {
                              "age" : { "gte" : 50, "lte" : 60 }
                            }
                          }
                    }
                }
            }
        }

      ],
      "minimum_should_match" : 1,
      "boost" : 1.0
    }

  }

}

Source filtering

integration_test_person_group/person_group/_search
{
    "_source": false,
    "query" : {
        "match" : { "name" : "name17" }
    }
}

integration_test_person_group/person_group/_search
{
    "_source": {
            "includes": [ "name", "persons.*" ],
            "excludes": [ "date*", "version", "persons.age" ]
        },
    "query" : {
        "match" : { "name" : "name17" }
    }
}

prefix

integration_test_person_group/person_group/_search
{ 
  "query": {
    "prefix" : { "name" : "name" }
  }
}

wildcard

integration_test_person_group/person_group/_search
{
    "query": {
        "wildcard" : { "name" : "name*" }
    }
}

regexp

integration_test_person_group/person_group/_search
{
    "query": {
        "regexp":{
            "name": "na.*17"
        }
    }
}

创建索引

在ElasticSearch索引中,对应于CRUD中的“创建”和“更新” –
如果对具有给定类型的文档进行索引,并且要插入原先不存在的ID。
如果具有相同类型和ID的文档已存在,则会被覆盖。

要索引第一个JSON对象,我们对REST
API
创建一个PUT请求到一个由索引名称,类型名称和ID组成的URL。
也就是:http://localhost:9200/<index>/<type>/[<id>]

索引和类型是必需的,而id部分是可选的。如果不指定IDElasticSearch会为我们生成一个ID
但是,如果不指定id,应该使用HTTP的POST而不是PUT请求。

索引名称是任意的。如果服务器上没有此名称的索引,则将使用默认配置来创建一个索引。

至于类型名称,它也是任意的。 它有几个用途,包括:

  • 每种类型都有自己的ID空间。
  • 不同类型具有不同的映射(“模式”,定义属性/字段应如何编制索引)。
  • 搜索多种类型是可以的,并且也很常见,但很容易搜索一种或多种指定类型。

现在我们来索引一些内容!
可以把任何东西放到索引中,只要它可以表示为单个JSON对象。
在本教程中,使用索引和搜索电影的一个示例。这是一个经典的电影对象信息:

{
    "title": "The Godfather",
    "director": "Francis Ford Coppola",
    "year": 1972
}

要创建一个索引,这里使用索引的名称为“movies”,类型名称(“movie”)和id(“1”),并按照上述模式使用JSON对象在正文中进行请求。

curl -XPUT "http://localhost:9200/movies/movie/1" -d'
{
    "title": "The Godfather",
    "director": "Francis Ford Coppola",
    "year": 1972
}'

Jkes工作原理

索引工作原理:

  • 应用启动时,Jkes扫描所有标注@Document注解的实体,为它们构建元数据。
  • 基于构建的元数据,创建indexmappingJson格式的配置,然后通过ElasticSearch Java Rest Client将创建/更新index配置。
  • 为每个文档创建/更新Kafka ElasticSearch Connector,用于创建/更新文档
  • 为整个项目启动/更新Jkes Deleter Connector,用于删除文档
  • 拦截数据操作方法。将* save(*)方法返回的数据包装为SaveEvent保存到EventContainer;使用(* delete*(..)方法的参数,生成一个DeleteEvent/DeleteAllEvent保存到EventContainer
  • 拦截事务。在事务提交后使用JkesKafkaProducer发送SaveEvent中的实体到Kafka,Kafka会使用我们提供的JkesJsonSerializer序列化指定的数据,然后发送到Kafka。
  • SaveEvent不同,DeleteEvent会直接被序列化,然后发送到Kafka,而不是只发送一份数据
  • SaveEventDeleteEvent不同,DeleteAllEvent不会发送数据到Kafka,而是直接通过ElasticSearch Java Rest Client删除相应的index,然后重建该索引,重启Kafka ElasticSearch Connector

查询工作原理:

  • 查询服务通过rest api提供
  • 我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序API的接入难度
  • 查询服务是一个Spring Boot Application,使用docker打包为镜像
  • 查询服务提供多版本API,用于API进化和兼容
  • 查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。
  • 为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。

可以使用curl来执行它,也可以使用Sense。这里使用Sense,可以自己填充URL,方法和请求正文,或者您以复制上述curl示例,将光标置于Sense中的正文字段中写入上面的Json对象,然后按点击绿色小箭头来执行创建索引操作。如下图所示

澳门新葡萄京官网注册 3

执行请求后,可以看到接收到来自ElasticSearch响应的JSON对象。如下所示 –

{
   "_index": "movies",
   "_type": "movie",
   "_id": "1",
   "_version": 1,
   "result": "created",
   "_shards": {
      "total": 2,
      "successful": 1,
      "failed": 0
   },
   "created": true
}

响应对象包含有关索引操作的信息,例如它是否成功(“ok”)和文档ID,如果不指定则ElasticSearch会自己生成一个。

如果运行Sense提供的默认搜索请求(可以使用Sense中的“历史记录”按钮访问,因为确实已执行它)过了,就会看到返回有数据的结果。

{
   "took": 146,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 1,
      "max_score": 1,
      "hits": [
         {
            "_index": "movies",
            "_type": "movie",
            "_id": "1",
            "_score": 1,
            "_source": {
               "title": "The Godfather",
               "director": "Francis Ford Coppola",
               "year": 1972
            }
         }
      ]
   }
}

在上面返回结果中,看到的是搜索结果而不是错误或是空的结果。

流程图

澳门新葡萄京官网注册 4

更新索引

现在,在索引中有了一部电影信息,接下来来了解如何更新它,添加一个类型列表。要做到这一点,只需使用相同的ID索引它。使用与之前完全相同的索引请求,但类型扩展了JSON对象。

curl -XPUT "http://localhost:9200/movies/movie/1" -d'
{
    "title": "The Godfather",
    "director": "Francis Ford Coppola",
    "year": 1972,
    "genres": ["Crime", "Drama"]
}'

模块介绍

ElasticSearch的响应结果与前面的大体上一样,但有一点区别,结果对象中的_version属性的值为2,而不是1。响应结果如下

{
   "_index": "movies",
   "_type": "movie",
   "_id": "1",
   "_version": 2,
   "result": "updated",
   "_shards": {
      "total": 2,
      "successful": 1,
      "failed": 0
   },
   "created": false
}

版本号(_version)可用于跟踪文档已编入索引的次数。它的主要目的是允许乐观的并发控制,因为可以在索引请求中提供一个版本,如果提供的版本高于索引中的版本,ElasticSearch将只覆盖文档内容,ID值不变,版本号自动添加。

jkes-core

jkes-core是整个jkes的核心部分。主要包括以下功能:

  • annotation包提供了jkes的核心注解
  • elasticsearch包封装了elasticsearch相关的操作,如为所有的文档创建/更新索引,更新mapping
  • kafka包提供了Kafka 生产者,Kafka Json Serializer,Kafka Connect
    Client
  • metadata包提供了核心的注解元数据的构建与结构化模型
  • event包提供了事件模型与容器
  • exception包提供了常见的Jkes异常
  • http包基于Apache Http Client封装了常见的http json请求
  • support包暴露了Jkes核心配置支持
  • util包提供了一些工具类,便于开发。如:Asserts, ClassUtils,
    DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils

由ID获取文档/索引

上面已经学习了索引新文档以及更新存在的文档。还看到了一个简单搜索请求的示例。如果只是想检索一个具有已知ID的索引,一个方法是搜索索引中的文档。另一个简单而快速的方法是通过ID,使用GET来检索它。

简单的做法是向同一个URL发出一个GET请求,URL的ID部分是强制性的。通过ID从ElasticSearch中检索文档可发出URL的GET请求:http://localhost:9200/<index>/<type>/<id>

使用以下请求尝试获取电影信息:

curl -XGET "http://localhost:9200/movies/movie/1" -d''

执行结果如下所示 –

澳门新葡萄京官网注册 5

正如下图所看到的,结果对象包含与索引时所看到的类似的元数据,如索引,类型和版本信息。
最后最重要的是,它有一个名称为“_source”的属性,它包含实际获取的文档信息。

关于GET没有什么可说的,因为它很简单,继续最后删除操作。

jkes-boot

jkes-boot用于与一些第三方开源框架进行集成。

当前,我们通过jkes-spring-data-jpa,提供了与spring data jpa的集成。通过使用Spring的AOP机制,对Repository方法进行拦截,生成SaveEvent/DeleteEvent/DeleteAllEvent保存到EventContainer。通过使用我们提供的SearchPlatformTransactionManager,对常用的事务管理器(如JpaTransactionManager)进行包装,提供事务拦截功能。

在后续版本,我们会提供与更多框架的集成。

jkes-spring-data-jpa说明:

  • ContextSupport类用于从bean工厂获取Repository Bean
  • @EnableJkes让客户端能够轻松开启Jkes的功能,提供了与Spring一致的配置模型
  • EventSupport处理事件的细节,在保存和删除数据时生成相应事件存放到EventContainer,在事务提交和回滚时处理相应的事件
  • SearchPlatformTransactionManager包装了客户端的事务管理器,在事务提交和回滚时加入了回调hook
  • audit包提供了一个简单的AuditedEntity父类,方便添加审计功能,版本信息可用于结合ElasticSearch的版本机制保证不会索引过期文档数据
  • exception包封装了常见异常
  • intercept包提供了AOP切点和切面
  • index包提供了全量索引功能。当前,我们提供了基于线程池的索引机制和基于ForkJoin的索引机制。在后续版本,我们会重构代码,增加基于阻塞队列生产者-消费者模式,提供并发性能

删除文档

为了通过ID从索引中删除单个指定的文档,使用与获取索引文档相同的URL,只是这里将HTTP方法更改为DELETE

curl -XDELETE "http://localhost:9200/movies/movie/1" -d''

响应对象包含元数据方面的一些常见数据字段,以及名为“_found”的属性,表示文档确实已找到并且操作成功。

澳门新葡萄京官网注册 6

在执行DELETE调用后切换回GET,可以验证文档是否确实已删除。

jkes-services

jkes-services主要用来提供一些服务。
目前,jkes-services提供了以下服务:

  • jkes-delete-connector
    • jkes-delete-connector是一个Kafka Connector,用于从kafka集群获取索引删除事件(DeleteEvent),然后使用Jest Client删除ElasticSearch中相应的文档。
    • 借助于Kafka Connect的rest admin
      api,我们轻松地实现了多租户平台上的文档删除功能。只要为每个项目启动一个jkes-delete-connector,就可以自动处理该项目的文档删除工作。避免了每启动一个新的项目,我们都得手动启动一个Kafka
      Consumer来处理该项目的文档删除工作。尽管可以通过正则订阅来减少这样的工作,但是还是非常不灵活
  • jkes-search-service
    • jkes-search-service是一个restful的搜索服务,提供了多版本的rest
      query api。查询服务提供多版本API,用于API进化和兼容
    • jkes-search-service目前支持URI风格的搜索和JSON请求体风格的搜索。
    • 我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序的接入难度
    • 查询服务是一个Spring Boot Application,使用docker打包为镜像
    • 查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。
    • 为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。

后续,我们将会基于zookeeper构建索引集群,提供集群索引管理功能

搜索

在前面,已经介绍了在ElasticSearch索引中处理数据的基础知识,现在是时候进行核心功能的学习了。考虑到之前我们删除索引中的所有文档,所以,在进行搜索学习之前,需要一些添加一些示例数据。使用以下这些请求和数据对象来创建索引。

curl -XPUT "http://localhost:9200/movies/movie/1" -d'
{
    "title": "The Godfather",
    "director": "Francis Ford Coppola",
    "year": 1972,
    "genres": ["Crime", "Drama"]
}'

curl -XPUT "http://localhost:9200/movies/movie/2" -d'
{
    "title": "Lawrence of Arabia",
    "director": "David Lean",
    "year": 1962,
    "genres": ["Adventure", "Biography", "Drama"]
}'

curl -XPUT "http://localhost:9200/movies/movie/3" -d'
{
    "title": "To Kill a Mockingbird",
    "director": "Robert Mulligan",
    "year": 1962,
    "genres": ["Crime", "Drama", "Mystery"]
}'

curl -XPUT "http://localhost:9200/movies/movie/4" -d'
{
    "title": "Apocalypse Now",
    "director": "Francis Ford Coppola",
    "year": 1979,
    "genres": ["Drama", "War"]
}'

curl -XPUT "http://localhost:9200/movies/movie/5" -d'
{
    "title": "Kill Bill: Vol. 1",
    "director": "Quentin Tarantino",
    "year": 2003,
    "genres": ["Action", "Crime", "Thriller"]
}'

curl -XPUT "http://localhost:9200/movies/movie/6" -d'
{
    "title": "The Assassination of Jesse James by the Coward Robert Ford",
    "director": "Andrew Dominik",
    "year": 2007,
    "genres": ["Biography", "Crime", "Drama"]
}'

值得指出的是,ElasticSearch具有和端点(_bulk)用于用单个请求索引多个文档,但是这超出了本教程的范围,这里只保持简单,使用六个单独的请求学习。

jkes-integration-test

jkes-integration-test是一个基于Spring
Boot集成测试项目,用于进行功能测试。同时测量一些常见操作的吞吐率

_search端点

现在已经把一些电影信息放入了索引,可以通过搜索看看是否可找到它们。
为了使用ElasticSearch进行搜索,我们使用_search端点,可选择使用索引和类型。也就是说,按照以下模式向URL发出请求:<index>/<type>/_search。其中,indextype都是可选的。

换句话说,为了搜索电影,可以对以下任一URL进行POST请求:

  • http://localhost:9200/_search –
    搜索所有索引和所有类型。
  • http://localhost:9200/movies/_search –
    在电影索引中搜索所有类型
  • http://localhost:9200/movies/movie/_search –
    在电影索引中显式搜索电影类型的文档。

因为我们只有一个单一的索引和单一的类型,所以怎么使用都不会有什么问题。为了简洁起见使用第一个URL。

开发

To build a development version you’ll need a recent version of Kafka.
You can build jkes with Maven using the standard lifecycle phases.

搜索请求正文和ElasticSearch查询DSL

如果只是发送一个请求到上面的URL,我们会得到所有的电影信息。为了创建更有用的搜索请求,还需要向请求正文中提供查询。
请求正文是一个JSON对象,除了其它属性以外,它还要包含一个名称为“query”的属性,这就可使用ElasticSearch的查询DSL。

{
    "query": {
        //Query DSL here
    }
}

你可能想知道查询DSL是什么。它是ElasticSearch自己基于JSON的域特定语言,可以在其中表达查询和过滤器。想象ElasticSearch它像关系数据库的SQL。这里是ElasticSearch自己的文档解释它的一部分(英文好自己撸吧):

Think of the Query DSL as an AST of queries. Certain queries can
contain other queries (like the bool query), other can contain filters
(like the constant_score), and some can contain both a query and a
filter (like the filtered). Each of those can contain any query of the
list of queries or any filter from the list of filters, resulting in
the ability to build quite complex (and interesting) queries. see
more:
http://www.elasticsearch.org/guide/reference/query-dsl/

Contribute

  • Source Code:
  • Issue Tracker:

基本自由文本搜索

查询DSL具有一长列不同类型的查询可以使用。
对于“普通”自由文本搜索,最有可能想使用一个名称为“查询字符串查询”。

查询字符串查询是一个高级查询,有很多不同的选项,ElasticSearch将解析和转换为更简单的查询树。如果忽略了所有的可选参数,并且只需要给它一个字符串用于搜索,它可以很容易使用。

现在尝试在两部电影的标题中搜索有“kill”这个词的电影信息:

curl -XPOST "http://localhost:9200/_search" -d'
{
    "query": {
        "query_string": {
            "query": "kill"
        }
    }
}'

执行上面的请求并查看结果,如下所示 –

澳门新葡萄京官网注册 7

正如预期的,得到两个命中结果,每个电影的标题中都带有“kill”单词。再看看另一种情况,在特定字段中搜索。

LICENSE

This project is licensed under Apache License 2.0.

指定搜索的字段

在前面的例子中,使用了一个非常简单的查询,一个只有一个属性“query”的查询字符串查询。
如前所述,查询字符串查询有一些可以指定设置,如果不使用,它将会使用默认的设置值。

这样的设置称为“fields”,可用于指定要搜索的字段列表。如果不使用“fields”字段,ElasticSearch查询将默认自动生成的名为“_all”的特殊字段,来基于所有文档中的各个字段匹配搜索。

为了做到这一点,修改以前的搜索请求正文,以便查询字符串查询有一个fields属性用来要搜索的字段数组:

curl -XPOST "http://localhost:9200/_search" -d'
{
    "query": {
        "query_string": {
            "query": "ford",
            "fields": ["title"]
        }
    }
}'

执行上面查询它,看看会有什么结果(应该只匹配到 1 行数据):

澳门新葡萄京官网注册 8

正如预期的得到一个命中,电影的标题中的单词“ford”。现在,从查询中移除fields属性,应该能匹配到
3 行数据:

澳门新葡萄京官网注册 9

过滤

前面已经介绍了几个简单的自由文本搜索查询。现在来看看另一个示例,搜索“drama”,不明确指定字段,如下查询

curl -XPOST "http://localhost:9200/_search" -d'
{
    "query": {
        "query_string": {
            "query": "drama"
        }
    }
}'

因为在索引中有五部电影在_all字段(从类别字段)中包含单词“drama”,所以得到了上述查询的5个命中。
现在,想象一下,如果我们想限制这些命中为只是1962年发布的电影。要做到这点,需要应用一个过滤器,要求“year”字段等于1962

要添加过滤器,修改搜索请求正文,以便当前的顶级查询(查询字符串查询)包含在过滤的查询中:

{
    "query": {
        "filtered": {
            "query": {
                "query_string": {
                    "query": "drama"
                }
            },
            "filter": {
                //Filter to apply to the query
            }
        }
    }
}

过滤的查询是具有两个属性(queryfilter)的查询。执行时,它使用过滤器过滤查询的结果。要完成这样的查询还需要添加一个过滤器,要求year字段的值为1962

ElasticSearch查询DSL有各种各样的过滤器可供选择。对于这个简单的情况,某个字段应该匹配一个特定的值,一个条件过滤器就能很好地完成工作。

"filter": {
    "term": { "year": 1962 }
}

完整的搜索请求如下所示:

curl -XPOST "http://localhost:9200/_search" -d'
{
    "query": {
        "filtered": {
            "query": {
                "query_string": {
                    "query": "drama"
                }
            },
            "filter": {
                "term": { "year": 1962 }
            }
        }
    }
}'

当执行上面请求,只得到两个命中,这个两个命中的数据的 year
字段的值都是等于 1962

无需查询即可进行过滤

在上面的示例中,使用过滤器限制查询字符串查询的结果。如果想要做的是应用一个过滤器呢?
也就是说,我们希望所有电影符合一定的标准。

在这种情况下,我们仍然在搜索请求正文中使用“query”属性。但是,我们不能只是添加一个过滤器,需要将它包装在某种查询中。

一个解决方案是修改当前的搜索请求,替换查询字符串 query
过滤查询中的match_all查询,这是一个查询,只是匹配一切。类似下面这个:

curl -XPOST "http://localhost:9200/_search" -d'
{
    "query": {
        "filtered": {
            "query": {
                "match_all": {
                }
            },
            "filter": {
                "term": { "year": 1962 }
            }
        }
    }
}'

另一个更简单的方法是使用常数分数查询:

curl -XPOST "http://localhost:9200/_search" -d'
{
    "query": {
        "constant_score": {
            "filter": {
                "term": { "year": 1962 }
            }
        }
    }
}'

参考:http://www.yiibai.com/elasticsearch/elasticsearch-getting-start.html

发表评论

电子邮件地址不会被公开。 必填项已用*标注