ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

python-如何使用elasticsearch.helpers.streaming_bulk

2019-10-10 17:57:42  阅读:540  来源: 互联网

标签:helpers bulk python elasticsearch


有人可以建议如何使用函数elasticsearch.helpers.streaming_bulk代替elasticsearch.helpers.bulk将数据索引到elasticsearch中.

如果我只是简单地更改streaming_bulk而不是批量,则不会索引任何内容,因此我猜它需要以其他形式使用.

下面的代码从CSV文件中以500个元素的块创建索引,类型和索引数据,并进入elasticsearch.它工作正常,但是我在徘徊是否可以提高性能.这就是为什么我想尝试streaming_bulk函数的原因.

目前,我需要10分钟才能为200MB的CSV文档索引100万行.我使用两台机器,Centos 6.6,8个CPU,x86_64,CPU MHz:2499.902,内存:15.574G.
不确定它可以更快地进行.

es = elasticsearch.Elasticsearch([{'host': 'uxmachine-test', 'port': 9200}])
index_name = 'new_index'
type_name = 'new_type'
mapping = json.loads(open(config["index_mapping"]).read()) #read mapping from json file

es.indices.create(index_name)
es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping)

with open(file_to_index, 'rb') as csvfile:
    reader = csv.reader(csvfile)        #read documents for indexing from CSV file, more than million rows
    content = {"_index": index_name, "_type": type_name}
    batch_chunks = []
    iterator = 0

    for row in reader:
        var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment)
        id_increment = id_increment + 1
        #var = transform_row_for_indexing(row,fields, index_name, type_name)
        batch_chunks.append(var)
        if iterator % 500 == 0:
            helpers.bulk(es,batch_chunks)
            del batch_chunks[:]
            print "ispucalo batch"
        iterator = iterator + 1
    # indexing of last batch_chunk
    if len(batch_chunks) != 0:
        helpers.bulk(es,batch_chunks)

解决方法:

因此,流式批量返回一个interator.这意味着除非您开始对其进行迭代,否则将不会发生任何事情. “批量”功能的代码如下所示:

success, failed = 0, 0

# list of errors to be collected is not stats_only
errors = []

for ok, item in streaming_bulk(client, actions, **kwargs):
    # go through request-reponse pairs and detect failures
    if not ok:
        if not stats_only:
            errors.append(item)
        failed += 1
    else:
        success += 1

return success, failed if stats_only else errors

因此,基本上只调用streaming_bulk(client,actions,** kwargs)实际上不会做任何事情.直到像本for循环中那样对它进行迭代,索引才真正开始发生.

因此,在您的代码中.欢迎您将“批量”更改为“ streaming_bulk”,但是您需要遍历流批量的结果才能真正索引任何内容.

标签:helpers,bulk,python,elasticsearch
来源: https://codeday.me/bug/20191010/1887925.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有