Skip to content

Py-API

Python-ES

  • connect cluster:
from elasticsearch import Elasticsearch

from elasticsearch import Elasticsearch
elastic = Elasticsearch("localhost")

'''
try makingto make a custom connection that
will sniffs out node and cluster issues
'''
try:
    # custom host with sniffing turned on
    elastic = Elasticsearch(
        ["{YOUR_DOMAIN}:7500"],
        sniff_on_start=True,
        sniff_on_connection_fail=True,
        sniffer_timeout=60
        )
except Exception as error:
    print ("Elasticsearch Client Error:", error)
    # make asimple default connection if error
    elastic = Elasticsearch()`

print ("\nELASTICSEARCH INSTANCE:", elastic)
print ("CLIENT ATTRIBUTES:", dir(elastic))

Python Snippts

  • 修改最大10000限制

    es.indices.put_settings(index=self.index,
        body={"index": {"max_result_window": 1000000 }})
    
    es.search(
        index=self.index,
        body=query_body,
        from_=(page-1) * size,
        size=size
            )
    

  • 返回结果filter

    es.search(
        index=self.index,
        body=query_body,
        from_=(page-1) * size,
        size=size,
        filter_path=['col1']
            )
    

  • es-CRUD

    class ESBase:
        es = Elasticsearch(uris)
    
        def __init__(self, index=None, doc_type=None):
            self.index = index
            self.doc_type = doc_type
    
        @exception_handler
        def insert_data_bulk(self, data):
            """
            bulk insert data list.
            :param data: list
            :return:
            ex:
            [{
                "_id": uuid.uuid4(),  # random UUID for _id
                "doc_type": "person",  # document _type
                "tag": {  # the body of the document
                    "name": "George Peterson",
                    "sex": "male",
                    "age": 34,
                    "years": x
                }
            } for x in range(2)]
            """
            helpers.bulk(self.es, data, index=self.index, doc_type=self.doc_type)
    
        @classmethod
        def create_index(cls, index):
            cls.es.indices.create(index=index, ignore=400)
    
        @exception_handler
        def insert_one_data(self, data, _id=None):
            """
            insert one data, if exist, then update.
            :param data: dict
            :return:
            {
                "test": 12345,
                "name": "dean"
            }
            """
            res = self.es.index(index=self.index, body=data, doc_type=self.doc_type, id=_id)
            return res
    
        def search_by_id(self, _id):
            res = self.es.get(
                index=self.index,
                id=_id
            )
            return res
    
        def search_by_query(self, query):
            res = self.es.search(
                index=self.index,
                body=query)
            return res
    
        @exception_handler
        def update_by_id(self, _id, update_data):
            """
            update with data, doc is the default.
            :param _id: the doc id
            :param update_data: the data to update
            :return:
            ex: {
                "doc": {"tag": {"age": 10}}
            }
            update tag.age: 10
            """
            res = self.es.update(
                index=self.index,
                id=_id,
                doc_type=self.doc_type,
                body=update_data
            )
            return res
    
        @exception_handler
        def update_by_query(self, query, field, update_data):
            """
    
            :param query: select from index
            :param field:
            :param update_data:
            :return:
            :ex:
            query = {
                "name": "dean"
            }
            field = "age"
            update_data = 30
            """
            _inline = "ctx._source.{field}={update_data}".format(field=field, update_data=update_data)
            _query = {
                "script": {
                    "inline": _inline,
                },
                "query": {
                    "match": query
                }
            }
            res = self.es.update_by_query(body=_query, index=self.index)
            return res
    
        @exception_handler
        def update_by_bulk(self, _id, update_data):
            # todo:  需要抽象 and test.
            action = [{
                "_id": _id,
                "_type": self.doc_type,
                "_index": self.index,
                "_source": {"doc": update_data},
                "_op_type": 'update'
            }]
            res = helpers.bulk(self.es, action)
            return res
    
        @exception_handler
        def delete_by_bulk(self, _id):
            # todo: 需要抽象和验证
            action = [
                {
                    '_op_type': 'delete',
                    '_index': self.index,
                    '_type': self.doc_type,
                    '_id': _id,
                }
            ]
            res = helpers.bulk(self.es, action)
            return res
    
        @exception_handler
        def delete_index(self):
            res = self.es.indices.delete(index=self.index, ignore=[400, 404])
            return res
    
        @exception_handler
        def delete_by_query(self, query):
            """
            delete with query.
            :param query:
            ex: es.delete_by_query({"doc.age":34}) or es.delete_by_query({"name":34})
            """
            res = self.es.delete_by_query(
                index=self.index,
                body={"query": {"match": query}},
                _source=True
            )
            return res
    
        @exception_handler
        def delete_by_id(self, _id):
            res = self.es.delete(index=self.index, doc_type=self.doc_type, id=_id)
            return res
    
        # you need search in particular field and keyword must be word, not letter
        def full_text_search(self, keyword, field):
            res = self.es.search(
                index=self.index,
                body={
                    "query": {
                        "match": {
                            field: keyword
                        }
                    }
                }
            )
            return res
    
        def regex_search(self, pat, field):
            res = self.es.search(
                index=self.index,
                body={
                    "query": {
                        "regexp": {
                            field: pat
                        }
                    }
                }
            )
            return res
    
        def prefix_search(self, prefix, field):
            res = self.es.search(
                index=self.index,
                body={
                    "query": {
                        "prefix": {
                            field: prefix
                        }
                    }
                }
            )
            return res