python落成mysql的读写分离及负荷均衡

       
Oracle数据库有其公司支出的配套rac来得以达成负载均衡,目前已知的最大节点数能到1贰十六个,但是其带来的爱抚资金财产确实是非常高的,并且rac的安宁也并不是特地美丽,尤其是节点繁多的时候。

     
 然而,相对mysql来讲,rac的实用性要比mysql的配套集群软件mysql-cluster要高诸多。因为从互连网了然到意况来看,很少公司在应用mysql-cluster,大大多商家都会选用第贰方代理软件,比方MySQL
Proxy、Mycat、haproxy等,不过这会唤起此外1个标题:单点故障(包罗mysql-cluster:管理节点)。借使要缓慢解决这一个难点,就须求给代理软件搭建集群,在访问量极大的地方下,代理软件的双机或三机集群会成为访问瓶颈,继续增添其节点数,无疑会拉动各地方的花费。

那么,怎么样得以化解那些主题素材吗?

         
化解上述难点,最棒的方法个人认为应该是在程序中落到实处。通过和别的mysql
DBA的关系,也表明了那一个主见。不过通过带来的疑点也就发出了:会不会大增开采开支?对现成的选拔种类做修改会不会更动十分大?会不会增添中期版本进级的难度?等等。

        对于1个架构划设想计优秀的采取种类能够很料定的对答:不会。

        那么怎么算三个架构划设想计非凡的选取种类啊?

     
 一言以蔽之,正是分层合理、功效模块之间耦合性底。以笔者的阅历来讲,系统规划基本上能够分开为以下4层:

       1.  实体层:首要定义一些实体类

       二.  数据层:也能够叫SQL管理层。首要负担跟数据库交互获得数据

       三. 
业务处:首借使依据业务流程及功效分别模块(大概说定义分裂的业务类)

       四.  表现层:呈现最后结果给用户

     
 完成上述意义(mysql的读写分离及负荷均衡),在这八个等级次序中,仅仅涉及到数据层。

严厉来说,对于规划美貌的连串,只提到到1个类的一个函数:在数据层中,一般都会独自划分出一个连接类,并且这些三番五次类中会有3个三番五次函数,须要更改的正是其1函数:在读取连接字符串从前加1个效应函数再次回到供给的主机、ip、端口号等新闻(未有开辟经历的同学只怕知道那段话有点困难)。

       流程图如下:

        Oracle 1

           代码如下:

Oracle,           

import mmap
import json
import random
import mysql.connector
import time

##公有变量
#dbinfos={
#         "db0":{'host':'192.168.42.60','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,"database":"","role":"RW","weight":10,"status":1},
#         "db1":{'host':'192.168.42.61','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,,"database":"":"R","weight":20,"status":1}
#         }

dbinfos={}
mmap_file = None
mmap_time=None

##这个函数返回json格式的字符串,也是实现初始化数据库信息的地方
##使用json格式是为了方便数据转换,从字符串---》二进制--》字符串---》字典
##如果采用其它方式共享dbinfos的方法,可以不用此方式
##配置库的地址
def get_json_str1():
    return json.dumps(dbinfos)

##读取配置库中的内容
def get_json_str():
    try:
        global dbinfos
        cnx = mysql.connector.connect(user='root', password='Abcd.1234',
                              host='192.168.42.60',
                              database='rwlb')
        cursor = cnx.cursor()
        cmdString="select * from rwlb"
        cnt=-1
        cursor.execute(cmdString)
        for (host,user,pwd,my_user,my_pwd,role,weight,status,port,db ) in cursor:
            cnt=cnt+1
            dict_db={'host':host,'user':user,'pwd':pwd,'my_user':my_user,'my_pwd':my_pwd,"port":port,"database":db,"role":role,"weight":weight,"status":status}
            dbinfos["db"+str(cnt)]=dict_db
        cursor.close()
        cnx.close()
        return json.dumps(dbinfos)
    except:
        cursor.close()
        cnx.close()
        return ""

##判断是否能正常连接到数据库
def check_conn_host():
    try:
        cnx = mysql.connector.connect(user='root', password='Abcd.1234',
                              host='192.168.42.60',
                              database='rwlb')
        cursor = cnx.cursor()
        cmdString="select user()"
        cnt=-1
        cursor.execute(cmdString)
        for user in cursor:
            cnt=len(user)
        cursor.close()
        cnx.close()
        return cnt
    except :
        return -1;


##select 属于读操作,其他属于写操作-----这里可以划分的更详细,比如执行存储过程等
def analyze_sql_state(sql):
    if "select" in sql:
        return "R"
    else:
        return "W"

##读取时间信息
def read_mmap_time():
    global mmap_time,mmap_file
    mmap_time.seek(0)
    ##初始时间
    inittime=int(mmap_time.read().translate(None, b'\x00').decode())
    ##当前时间
    endtime=int(time.time())
    ##时间差
    dis_time=endtime-inittime
    print("dis_time:"+str(dis_time))
    #重新读取数据
    if dis_time>10:
        ##当配置库正常的情况下才重新读取数据
        print(str(check_conn_host()))
        if check_conn_host()>0:           
            print("read data again")
            mmap_time.seek(0)
            mmap_file.seek(0)
            mmap_time.write(b'\x00')
            mmap_file.write(b'\x00')
            get_mmap_time()
            get_mmap_info()
        else:
            print("can not connect to host")            
    #不重新读取数据
    else:
        print("do not read data again")


##从内存中读取信息,
def read_mmap_info(sql):
    read_mmap_time()
    print("The data is in memory")
    global mmap_file,dict_db
    mmap_file.seek(0)
    ##把二进制转换为字符串
    info_str=mmap_file.read().translate(None, b'\x00').decode()
    #3把字符串转成json格式,方便后面转换为字典使用
    infos=json.loads(info_str)   
    host_count=len(infos)
    ##权重列表
    listw=[]
    ##总的权重数量
    wtotal=0
    ##数据库角色
    dbrole=analyze_sql_state(sql)
    ##根据权重初始化一个列表。这个是比较简单的算法,所以权重和控制在100以内比较好----这里可以选择其他比较好的算法
    for i in range(host_count):
        db="db"+str(i)
        if dbrole in infos[db]["role"]:
            if int(infos[db]["status"])==1:
                w=infos[db]["weight"]
                wtotal=wtotal+w
                for j in range(w):
                    listw.append(i)
    if wtotal >0:
        ##产生一个随机数
        rad=random.randint(0,wtotal-1)
        ##读取随机数所在的列表位置的数据
        dbindex=listw[rad]
        ##确定选择的是哪个db
        db="db"+str(dbindex)
        ##为dict_db赋值,即选取的db的信息
        dict_db=infos[db]
        return dict_db
    else :
        return {}


##如果内存中没有时间信息,则向内存红写入时间信息
def get_mmap_time():
    global mmap_time
    ##第二个参数1024是设定的内存大小,单位:字节。如果内容较多,可以调大一点
    mmap_time = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time')
    ##读取有效比特数,不包括空比特
    cnt=mmap_time.read_byte()
    if cnt==0:
        print("Load time to memory")
        mmap_time = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time')
        inittime=str(int(time.time()))
        mmap_time.write(inittime.encode())


##如果内存中没有对应信息,则向内存中写信息以供下次调用使用
def get_mmap_info():
    global mmap_file
    ##第二个参数1024是设定的内存大小,单位:字节。如果内容较多,可以调大一点
    mmap_file = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap')
    ##读取有效比特数,不包括空比特
    cnt=mmap_file.read_byte()
    if cnt==0:
        print("Load data to memory")
        mmap_file = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap')
        mmap_file.write(get_json_str().encode())

##测试函数
def test1():
    get_mmap_time()
    get_mmap_info()
    for i in range(10):
        sql="select * from db"
        #sql="update t set col1=a where b=2"
        dbrole=analyze_sql_state(sql)
        dict_db=read_mmap_info(sql)
        print(dict_db["host"])

def test2():
    sql="select * from db"
    res=analyze_sql_state(sql)
    print("select:"+res)
    sql="update t set col1=a where b=2"
    res=analyze_sql_state(sql)
    print("update:"+res)
    sql="insert into t values(1,2)"
    res=analyze_sql_state(sql)
    print("insert:"+res)
    sql="delete from t where b=2"
    res=analyze_sql_state(sql)
    print("delete:"+res)


##类似主函数
if __name__=="__main__":
    test2()

 

测试结果:

 

Oracle 2

 

从结果能够看来,只有首先次向内部存款和储蓄器加载数据,并且根据权重得以实现了负荷均衡。

因为测试函数test一()写的是原则性语句,所以读写分离的结果未有浮现出来。

 

此外:测试使用的数码库表结构及数据:

 

 desc rwlb;
+---------+-------------+------+-----+---------+-------+
| Field   | Type        | Null | Key | Default | Extra |
+---------+-------------+------+-----+---------+-------+
| host    | varchar(50) | YES  |     | NULL    |       |
| user    | varchar(50) | YES  |     | NULL    |       |
| pwd     | varchar(50) | YES  |     | NULL    |       |
| my_user | varchar(50) | YES  |     | NULL    |       |
| my_pwd  | varchar(50) | YES  |     | NULL    |       |
| role    | varchar(10) | YES  |     | NULL    |       |
| weight  | int(11)     | YES  |     | NULL    |       |
| status  | int(11)     | YES  |     | NULL    |       |
| port    | int(11)     | YES  |     | NULL    |       |
| db      | varchar(50) | YES  |     | NULL    |       |
+---------+-------------+------+-----+---------+-------+

select * from rwlb;
+---------------+------+----------+---------+-----------+------+--------+--------+------+------+
| host          | user | pwd      | my_user | my_pwd    | role | weight | status | port | db   |
+---------------+------+----------+---------+-----------+------+--------+--------+------+------+
| 192.168.42.60 | root | Abcd1234 | root    | Abcd.1234 | RW   |     10 |      1 | NULL | NULL |
| 192.168.42.61 | root | Abcd1234 | root    | Abcd.1234 | R    |     20 |      1 | NULL | NULL |
+---------------+------+----------+---------+-----------+------+--------+--------+------+------+

相关文章