1 #!/usr/bin/env python
2 # -*- coding:utf-8 -*-
3 # *************************************
4 # @Time : 2019/8/12
5 # @Author : Zhang Fan
6 # @Desc : Library
7 # @File : MyDatabases.py
8 # @Update : 2019/8/23
9 # *************************************
10 import elasticsearch
11 import phoenixdb
12 import pysolr
13 import pymysql
14
15
16 class MyELS(object):
17 """
18 ===================================================================
19 ===================== MyELS =========================
20 ===================================================================
21 """
22 def __init__(self):
23 self.els_conn = None
24
25 def connect_to_els(self, host, port):
26 """
27 连接到ElasticSearch服务器.
28 """
29 self.els_conn = elasticsearch.Elasticsearch([{'host': host, 'port': port}])
30 print('Executing : Connect To Elastic Search | %s' % self.els_conn)
31
32 def get_els_data(self, query, index):
33 """
34 获取ElasticSearch数据
35 """
36 print('Executing : Search | %s' % query)
37 try:
38 rst = self.els_conn.search(index=index, q=query)
39 return rst['hits']
40 except Exception as e:
41 print('Elastic Search Error | %s' % e)
42 raise Exception(e)
43
44
45 class MyPhoenix(object):
46 """
47 ===================================================================
48 ===================== MyPhoenix ======================
49 ===================================================================
50 """
51 def __init__(self):
52 self.phoenix_conn = None
53 self.phoenix_cursor = None
54
55 def connect_to_phoenix(self, host, port=8765):
56 """
57 连接到phoenix服务器
58 """
59 address = 'http://{0}:{1}/'.format(host, port)
60 print('Executing : Connect To Phoenix | %s' % address)
61 self.phoenix_conn = phoenixdb.connect(address, autocommit=True)
62 self.phoenix_cursor = self.phoenix_conn.cursor()
63
64 def set_schema(self, sql, schema):
65 """
66 设置schema
67 """
68 pre_sub, sub, fol_sub = sql.upper().partition('FROM')
69 fol_sub = ' ' + schema + '.' + fol_sub.strip()
70 new_sql = ''.join([pre_sub, sub, fol_sub])
71 return new_sql
72
73 def execute_phoenix_sql(self, sql):
74 """
75 执行sql语句
76 """
77 # sql = self.set_schema(sql, schema)
78 print('Executing : Execute | %s' % sql)
79 self.phoenix_cursor.execute(sql)
80
81 def get_from_phoenix(self, sql):
82 """
83 获取phoenix数据
84 """
85 # sql = self.set_schema(sql, schema)
86 print('Executing : Query | %s' % sql)
87 try:
88 self.phoenix_cursor.execute(sql)
89 except Exception as e:
90 print('Phoenix Error | %s' % e)
91 raise Exception(e)
92 return self.phoenix_cursor.fetchall()
93
94 def disconnect_from_phoenix(self):
95 """
96 断开phoenix连接
97 """
98 print('Executing : Disconnect From HBase')
99 self.phoenix_cursor.close()
100 self.phoenix_conn.close()
101
102
103 class MySolr(object):
104 """
105 ===================================================================
106 ===================== MySolr =========================
107 ===================================================================
108 """
109 def __init__(self):
110 self.solr_conn = None
111 self.base_url = None
112
113 def connect_to_solr(self, address, selector):
114 """连接到solr服务器.
115 """
116 self.base_url = 'http://{0}/solr/{1}/'.format(address, selector)
117 self.solr_conn = pysolr.Solr(self.base_url)
118 print('Executing : Connect To Solr | %s' % self.base_url)
119
120 def get_solr_data(self, query):
121 """
122 获取solr数据
123 """
124 results = list()
125 print('Executing : Search | %s' % query)
126 try:
127 items = self.solr_conn.search(query)
128 for item in items:
129 results.append(item)
130 except Exception as e:
131 print('Solr Error | %s' % e)
132 raise Exception(e)
133 return results
134
135 def add_solr_data(self, data):
136 """
137 添加solr数据
138 """
139 print('Executing : add | %s' % data)
140 try:
141 self.solr_conn.add([data])
142 self.solr_conn.commit()
143 except Exception as e:
144 print('Solr Error | %s' % e)
145 raise Exception(e)
146
147 def del_solr_byId(self, data):
148 """
149 删除solr数据
150 """
151 print('Executing : del | %s' % data)
152 try:
153 self.solr_conn.delete(id=data)
154 self.solr_conn.commit()
155 except Exception as e:
156 print('Solr Error | %s' % e)
157 raise Exception(e)
158
159
160 if __name__ == '__main__':
161 print('This is test.')
162 ms = MySolr()
163 me = MyELS()
164 mp = MyPhoenix()
Python 调用 ES、Solr、Phoenix
点赞
收藏