@@ -23,24 +23,24 @@ def next_event(self):
2323 """获取下一个事件"""
2424 if self ._finish :
2525 raise StopIteration
26- total_byte_length = struct .unpack ('>I' , self ._raw .read (4 ))[0 ] # message总长度
27- header_byte_length = struct .unpack ('>I' , self ._raw .read (4 ))[0 ] # header总长度
28- prelude_crc = struct .unpack ('>I' , self ._raw .read (4 ))[0 ]
26+ total_byte_length = struct .unpack ('>I' , bytes ( self ._raw .read (4 ) ))[0 ] # message总长度
27+ header_byte_length = struct .unpack ('>I' , bytes ( self ._raw .read (4 ) ))[0 ] # header总长度
28+ prelude_crc = struct .unpack ('>I' , bytes ( self ._raw .read (4 ) ))[0 ]
2929 # 处理headers
3030 offset = 0
3131 msg_headers = {}
3232 while offset < header_byte_length :
33- header_name_length = struct .unpack ('>B' , self ._raw .read (1 ))[0 ]
33+ header_name_length = struct .unpack ('>B' , bytes ( self ._raw .read (1 ) ))[0 ]
3434 header_name = self ._raw .read (header_name_length )
35- header_value_type = struct .unpack ('>B' , self ._raw .read (1 ))[0 ]
36- header_value_length = struct .unpack ('>H' , self ._raw .read (2 ))[0 ]
35+ header_value_type = struct .unpack ('>B' , bytes ( self ._raw .read (1 ) ))[0 ]
36+ header_value_length = struct .unpack ('>H' , bytes ( self ._raw .read (2 ) ))[0 ]
3737 header_value = self ._raw .read (header_value_length )
3838 msg_headers [header_name ] = header_value
3939 offset += 4 + header_name_length + header_value_length
4040 # 处理payload
4141 payload_byte_length = total_byte_length - header_byte_length - 16 # payload总长度
4242 payload = self ._raw .read (payload_byte_length )
43- message_crc = struct .unpack ('>I' , self ._raw .read (4 ))[0 ]
43+ message_crc = struct .unpack ('>I' , bytes ( self ._raw .read (4 ) ))[0 ]
4444 if ':message-type' in msg_headers and msg_headers [':message-type' ] == 'event' :
4545 if ':event-type' in msg_headers and msg_headers [':event-type' ] == "Records" :
4646 return {'Records' : {'Payload' : payload }}
0 commit comments