thrift

先走一遍官网的例子:

用到的两个文件tutorial.thrift and shared.thrift.

~/thrift/bin/thrift -r --gen py:utf8strings -out ./thrift_gen ./tutorial.thrift

生成三个文件: constants,ttypes,Calculator.py && SharedService.py

其中:constants 定义的是 const 变量,thrift 即里面的const 会放到这里:

const i32 INT32CONSTANT = 9853
const map<string,string> MAPCONSTANT = {'hello':'world', 'goodnight':'moon'}

ttypes 里面定义 enum 类型和 struct 类型:

class Operation:
  ADD = 1
  SUBTRACT = 2
  MULTIPLY = 3
  DIVIDE = 4

struct 会定义好 read,write 的方法:

class  Work:
	def __init__(...):
		pass
	def read():
		...
	def write():
		...

最后就是 service file ,里面定义暴露的接口:

def getStruct(self, key):
	self.send_getStruct(key)
	return self.recv_getStruct()

TProcessor,TTransport,TBinaryProtocol,TProtocol,fastbinary

TProtocol 协议, TTransport: socket; TProcessor 方法处理;

TProtocol 定义通信的协议。thrift TProtocolBase 协议定义了协议的开始、结束方法。writeMessageBegin 定义消息头,writeStructBegin 定义消息方法,writeFieldBegin 定义数据类型。eg:

  def writeMessageBegin(self, name, type, seqid):
    assert self.state == CLEAR
    self.__writeUByte(self.PROTOCOL_ID)
    self.__writeUByte(self.VERSION | (type << self.TYPE_SHIFT_AMOUNT))
    self.__writeVarint(seqid)
    self.__writeString(name)
    self.state = VALUE_WRITE

  def writeMessageEnd(self):
    assert self.state == VALUE_WRITE
    self.state = CLEAR

  def writeStructBegin(self, name):
    assert self.state in (CLEAR, CONTAINER_WRITE, VALUE_WRITE), self.state
    self.__structs.append((self.state, self.__last_fid))
    self.state = FIELD_WRITE
    self.__last_fid = 0

结合 clientserver的代码我们理解一下:

# Make socket, 创建 socket, transport
from thrift.transport import TSocket
transport = TSocket.TSocket('localhost', 9090)
# Buffering is critical. Raw sockets are very slow, 
from thrift.transport import TTransport
transport = TTransport.TBufferedTransport(transport)
# Wrap in a protocol, 协议初始化
protocol = TBinaryProtocol.TBinaryProtocol(transport)
# Create a client to use the protocol encoder, 客户端 with proto
client = Calculator.Client(protocol)
# Connect!
transport.open()
client.ping()
print('ping()')

processor 是解析、处理类,processMap是所有方法的 map。 每个 process 的过程是:

(name, type, seqid) = iprot.readMessageBegin()
args.read()
iprot.readMessageEnd()
initResp
hanle it:
    send
    recv
write
flush

transport 的过程:

  def read(self, sz):
    ret = self.__rbuf.read(sz)
    if len(ret) != 0:
      return ret

    self.__rbuf = StringIO(self.__trans.read(max(sz, self.__rbuf_size)))
    return self.__rbuf.read(sz)

TSocket:

import socket

  def open(self):
    try:
      res0 = self._resolveAddr()
      for res in res0:
        self.handle = socket.socket(res[0], res[1])
        self.handle.settimeout(self._timeout)
        try:
          self.handle.connect(res[4])
        except socket.error, e:
          if res is not res0[-1]:
            continue
          else:
            raise e
        break
    except socket.error, e:
      if self._unix_socket:
        message = 'Could not connect to socket %s' % self._unix_socket
      else:
        message = 'Could not connect to %s:%d' % (self.host, self.port)
      raise TTransportException(type=TTransportException.NOT_OPEN, message=message)

画重点,transport 就是 socket 的传输。

Client:

send:
	self._oprot.writeMessageBegin
	args init
	self._oprot.writeMessageEnd
	flush
recv:
	(fname, mtype, rseqid) = iprot.readMessageBegin()
	result.read(iprot)
	iprot.readMessageEnd()

看下 server 的过程:

class TSimpleServer(TServer):

  """Simple single-threaded server that just pumps around one transport."""

  def __init__(self, *args):
    TServer.__init__(self, *args)

  def serve(self):
    self.serverTransport.listen()
    while True:
      client = self.serverTransport.accept()
      itrans = self.inputTransportFactory.getTransport(client)
      otrans = self.outputTransportFactory.getTransport(client)
      iprot = self.inputProtocolFactory.getProtocol(itrans)
      oprot = self.outputProtocolFactory.getProtocol(otrans)
      try:
        while True:
          self.processor.process(iprot, oprot)
      except TTransport.TTransportException, tx:
        pass
      except Exception, x:
        logging.exception(x)

      itrans.close()
      otrans.close()

process 一直跑….

import socket

sk = socket.socket()
sk.bind(("127.0.0.1", 9092))
sk.listen(5)
while True:
    client, addr = sk.accept()
    while True:
        data = client.recv(1024)
        if not data:
            break
        print "get from client:%s" % data
        client.sendall(data)
    client.close()

client.py
import socket
s = socket.socket()
s.connect(("127.0.0.1", 9092))
s.sendall("i am client")
print "get from server:%s" % s.recv(1024)
s.close()

整个流程下来就是:

client 端: 建立socket 连接, 按照 proto 从 socket 里读数据。server 端:建立 socket,listen 连接, process 请求。