Py-API
Python-ES¶
- connect cluster:
from elasticsearch import Elasticsearch
from elasticsearch import Elasticsearch
elastic = Elasticsearch("localhost")
'''
try makingto make a custom connection that
will sniffs out node and cluster issues
'''
try:
# custom host with sniffing turned on
elastic = Elasticsearch(
["{YOUR_DOMAIN}:7500"],
sniff_on_start=True,
sniff_on_connection_fail=True,
sniffer_timeout=60
)
except Exception as error:
print ("Elasticsearch Client Error:", error)
# make asimple default connection if error
elastic = Elasticsearch()`
print ("\nELASTICSEARCH INSTANCE:", elastic)
print ("CLIENT ATTRIBUTES:", dir(elastic))
Python Snippts¶
-
修改最大10000限制
es.indices.put_settings(index=self.index, body={"index": {"max_result_window": 1000000 }}) es.search( index=self.index, body=query_body, from_=(page-1) * size, size=size )
-
返回结果filter
es.search( index=self.index, body=query_body, from_=(page-1) * size, size=size, filter_path=['col1'] )
-
es-CRUD
class ESBase: es = Elasticsearch(uris) def __init__(self, index=None, doc_type=None): self.index = index self.doc_type = doc_type @exception_handler def insert_data_bulk(self, data): """ bulk insert data list. :param data: list :return: ex: [{ "_id": uuid.uuid4(), # random UUID for _id "doc_type": "person", # document _type "tag": { # the body of the document "name": "George Peterson", "sex": "male", "age": 34, "years": x } } for x in range(2)] """ helpers.bulk(self.es, data, index=self.index, doc_type=self.doc_type) @classmethod def create_index(cls, index): cls.es.indices.create(index=index, ignore=400) @exception_handler def insert_one_data(self, data, _id=None): """ insert one data, if exist, then update. :param data: dict :return: { "test": 12345, "name": "dean" } """ res = self.es.index(index=self.index, body=data, doc_type=self.doc_type, id=_id) return res def search_by_id(self, _id): res = self.es.get( index=self.index, id=_id ) return res def search_by_query(self, query): res = self.es.search( index=self.index, body=query) return res @exception_handler def update_by_id(self, _id, update_data): """ update with data, doc is the default. :param _id: the doc id :param update_data: the data to update :return: ex: { "doc": {"tag": {"age": 10}} } update tag.age: 10 """ res = self.es.update( index=self.index, id=_id, doc_type=self.doc_type, body=update_data ) return res @exception_handler def update_by_query(self, query, field, update_data): """ :param query: select from index :param field: :param update_data: :return: :ex: query = { "name": "dean" } field = "age" update_data = 30 """ _inline = "ctx._source.{field}={update_data}".format(field=field, update_data=update_data) _query = { "script": { "inline": _inline, }, "query": { "match": query } } res = self.es.update_by_query(body=_query, index=self.index) return res @exception_handler def update_by_bulk(self, _id, update_data): # todo: 需要抽象 and test. action = [{ "_id": _id, "_type": self.doc_type, "_index": self.index, "_source": {"doc": update_data}, "_op_type": 'update' }] res = helpers.bulk(self.es, action) return res @exception_handler def delete_by_bulk(self, _id): # todo: 需要抽象和验证 action = [ { '_op_type': 'delete', '_index': self.index, '_type': self.doc_type, '_id': _id, } ] res = helpers.bulk(self.es, action) return res @exception_handler def delete_index(self): res = self.es.indices.delete(index=self.index, ignore=[400, 404]) return res @exception_handler def delete_by_query(self, query): """ delete with query. :param query: ex: es.delete_by_query({"doc.age":34}) or es.delete_by_query({"name":34}) """ res = self.es.delete_by_query( index=self.index, body={"query": {"match": query}}, _source=True ) return res @exception_handler def delete_by_id(self, _id): res = self.es.delete(index=self.index, doc_type=self.doc_type, id=_id) return res # you need search in particular field and keyword must be word, not letter def full_text_search(self, keyword, field): res = self.es.search( index=self.index, body={ "query": { "match": { field: keyword } } } ) return res def regex_search(self, pat, field): res = self.es.search( index=self.index, body={ "query": { "regexp": { field: pat } } } ) return res def prefix_search(self, prefix, field): res = self.es.search( index=self.index, body={ "query": { "prefix": { field: prefix } } } ) return res