ConnectionPool实现redis在python中的连接

Stella981
• 阅读 781

这篇文章主要介绍了Python与Redis的连接教程,Redis是一个高性能的基于内存的数据库,需要的朋友可以参考下

今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。 在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:   redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server

使用的方法:  

?

1

2

r = redis.StrictRedis(host = xxxx, port = xxxx, db = xxxx)

r.xxxx()

有了ConnectionPool这个类之后,可以使用如下方法

?

1

2

pool = redis.ConnectionPool(host = xxx, port = xxx, db = xxxx)

r = redis.Redis(connection_pool = pool)

这里Redis是StrictRedis的子类 简单分析如下: 在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:  

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

class StrictRedis( object ):

........

def __init__( self , host = 'localhost' , port = 6379 ,

db = 0 , password = None , socket_timeout = None ,

socket_connect_timeout = None ,

socket_keepalive = None , socket_keepalive_options = None ,

connection_pool = None , unix_socket_path = None ,

encoding = 'utf-8' , encoding_errors = 'strict' ,

charset = None , errors = None ,

decode_responses = False , retry_on_timeout = False ,

ssl = False , ssl_keyfile = None , ssl_certfile = None ,

ssl_cert_reqs = None , ssl_ca_certs = None ):

if not connection_pool:

..........

connection_pool = ConnectionPool( * * kwargs)

self .connection_pool = connection_pool

在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

# COMMAND EXECUTION AND PROTOCOL PARSING

def execute_command( self , * args, * * options):

"Execute a command and return a parsed response"

pool = self .connection_pool

command_name = args[ 0 ]

connection = pool.get_connection(command_name, * * options) #调用ConnectionPool.get_connection方法获取一个连接

try :

connection.send_command( * args) #命令执行,这里为Connection.send_command

return self .parse_response(connection, command_name, * * options)

except (ConnectionError, TimeoutError) as e:

connection.disconnect()

if not connection.retry_on_timeout and isinstance (e, TimeoutError):

raise

connection.send_command( * args)

return self .parse_response(connection, command_name, * * options)

finally :

pool.release(connection) #调用ConnectionPool.release释放连接

在来看看ConnectionPool类:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

class ConnectionPool( object ):

...........

def __init__( self , connection_class = Connection, max_connections = None ,

* * connection_kwargs): #类初始化时调用构造函数

max_connections = max_connections or 2 * * 31

if not isinstance (max_connections, ( int , long )) or max_connections < 0 : #判断输入的max_connections是否合法

raise ValueError( '"max_connections" must be a positive integer' )

self .connection_class = connection_class #设置对应的参数

self .connection_kwargs = connection_kwargs

self .max_connections = max_connections

self .reset() #初始化ConnectionPool 时的reset操作

def reset( self ):

self .pid = os.getpid()

self ._created_connections = 0 #已经创建的连接的计数器

self ._available_connections = [] #声明一个空的数组,用来存放可用的连接

self ._in_use_connections = set () #声明一个空的集合,用来存放已经在用的连接

self ._check_lock = threading.Lock()

.......

def get_connection( self , command_name, * keys, * * options): #在连接池中获取连接的方法

"Get a connection from the pool"

self ._checkpid()

try :

connection = self ._available_connections.pop() #获取并删除代表连接的元素,在第一次获取connectiong时,因为_available_connections是一个空的数组,

会直接调用make_connection方法

except IndexError:

connection = self .make_connection()

self ._in_use_connections.add(connection) #向代表正在使用的连接的集合中添加元素

return connection

def make_connection( self ): #在_available_connections数组为空时获取连接调用的方法

"Create a new connection"

if self ._created_connections > = self .max_connections: #判断创建的连接是否已经达到最大限制,max_connections可以通过参数初始化

raise ConnectionError( "Too many connections" )

self ._created_connections + = 1  #把代表已经创建的连接的数值+1

return self .connection_class( * * self .connection_kwargs) #返回有效的连接,默认为Connection(**self.connection_kwargs)

def release( self , connection): #释放连接,链接并没有断开,只是存在链接池中

"Releases the connection back to the pool"

self ._checkpid()

if connection.pid ! = self .pid:

return

self ._in_use_connections.remove(connection) #从集合中删除元素

self ._available_connections.append(connection) #并添加到_available_connections 的数组中

def disconnect( self ): #断开所有连接池中的链接

"Disconnects all connections in the pool"

all_conns = chain( self ._available_connections,

self ._in_use_connections)

for connection in all_conns:

connection.disconnect()

execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:  

?

1

2

3

4

5

6

7

class Connection( object ):

"Manages TCP communication to and from a Redis server"

def __del__( self ): #对象删除时的操作,调用disconnect释放连接

try :

self .disconnect()

except Exception:

pass

核心的链接建立方法是通过socket模块实现:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

def _connect( self ):

err = None

for res in socket.getaddrinfo( self .host, self .port, 0 ,

socket.SOCK_STREAM):

family, socktype, proto, canonname, socket_address = res

sock = None

try :

sock = socket.socket(family, socktype, proto)

# TCP_NODELAY

sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1 )

# TCP_KEEPALIVE

if self .socket_keepalive: #构造函数中默认 socket_keepalive=False,因此这里默认为短连接

sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1 )

for k, v in iteritems( self .socket_keepalive_options):

sock.setsockopt(socket.SOL_TCP, k, v)

# set the socket_connect_timeout before we connect

sock.settimeout( self .socket_connect_timeout) #构造函数中默认socket_connect_timeout=None,即连接为blocking的模式

# connect

sock.connect(socket_address)

# set the socket_timeout now that we're connected

sock.settimeout( self .socket_timeout) #构造函数中默认socket_timeout=None

return sock

except socket.error as _:

err = _

if sock is not None :

sock.close()

.....

关闭链接的方法:  

?

1

2

3

4

5

6

7

8

9

10

11

def disconnect( self ):

"Disconnects from the Redis server"

self ._parser.on_disconnect()

if self ._sock is None :

return

try :

self ._sock.shutdown(socket.SHUT_RDWR) #先shutdown再close

self ._sock.close()

except socket.error:

pass

self ._sock = None

        可以小结如下 1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。 2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。 3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
待兔 待兔
2个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Stella981 Stella981
2年前
Nginx + lua +[memcached,redis]
精品案例1、Nginxluamemcached,redis实现网站灰度发布2、分库分表/基于Leaf组件实现的全球唯一ID(非UUID)3、Redis独立数据监控,实现订单超时操作/MQ死信操作SelectPollEpollReactor模型4、分布式任务调试Quartz应用
Stella981 Stella981
2年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Python进阶者 Python进阶者
8个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这