@@ -112,6 +112,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
112112 public var security : SSLSecurity ?
113113 public var enabledSSLCipherSuites : [ SSLCipherSuite ] ?
114114 public var origin : String ?
115+ public var timeout = 5
115116 public var isConnected : Bool {
116117 return connected
117118 }
@@ -319,12 +320,12 @@ public class WebSocket : NSObject, NSStreamDelegate {
319320 self . mutex. unlock ( )
320321
321322 let bytes = UnsafePointer < UInt8 > ( data. bytes)
322- var timeout = 5000000 //wait 5 seconds before giving up
323+ var out = timeout * 1000000 //wait 5 seconds before giving up
323324 writeQueue. addOperationWithBlock { [ weak self] in
324325 while !outStream. hasSpaceAvailable {
325326 usleep ( 100 ) //wait until the socket is ready
326- timeout -= 100
327- if timeout < 0 {
327+ out -= 100
328+ if out < 0 {
328329 self ? . cleanupStream ( )
329330 self ? . doDisconnect ( self ? . errorWithDetail ( " write wait timed out " , code: 2 ) )
330331 return
@@ -405,25 +406,24 @@ public class WebSocket : NSObject, NSStreamDelegate {
405406 }
406407 ///dequeue the incoming input so it is processed in order
407408 private func dequeueInput( ) {
408- guard !inputQueue. isEmpty else { return }
409-
410- let data = inputQueue [ 0 ]
411- var work = data
412- if let fragBuffer = fragBuffer {
413- let combine = NSMutableData ( data: fragBuffer)
414- combine. appendData ( data)
415- work = combine
416- self . fragBuffer = nil
417- }
418- let buffer = UnsafePointer < UInt8 > ( work. bytes)
419- let length = work. length
420- if !connected {
421- processTCPHandshake ( buffer, bufferLen: length)
422- } else {
423- processRawMessage ( buffer, bufferLen: length)
409+ while !inputQueue. isEmpty {
410+ let data = inputQueue [ 0 ]
411+ var work = data
412+ if let fragBuffer = fragBuffer {
413+ let combine = NSMutableData ( data: fragBuffer)
414+ combine. appendData ( data)
415+ work = combine
416+ self . fragBuffer = nil
417+ }
418+ let buffer = UnsafePointer < UInt8 > ( work. bytes)
419+ let length = work. length
420+ if !connected {
421+ processTCPHandshake ( buffer, bufferLen: length)
422+ } else {
423+ processRawMessagesInBuffer ( buffer, bufferLen: length)
424+ }
425+ inputQueue = inputQueue. filter { $0 != data}
424426 }
425- inputQueue = inputQueue. filter { $0 != data}
426- dequeueInput ( )
427427 }
428428
429429 //handle checking the inital connection status
@@ -469,7 +469,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
469469 totalSize += 1 //skip the last \n
470470 let restSize = bufferLen - totalSize
471471 if restSize > 0 {
472- processRawMessage ( ( buffer+ totalSize) , bufferLen: restSize)
472+ processRawMessagesInBuffer ( buffer + totalSize, bufferLen: restSize)
473473 }
474474 return 0 //success
475475 }
@@ -522,12 +522,15 @@ public class WebSocket : NSObject, NSStreamDelegate {
522522 }
523523 }
524524
525- ///process the websocket data
526- private func processRawMessage( buffer: UnsafePointer < UInt8 > , bufferLen: Int) {
525+ /// Process one message at the start of `buffer`. Return another buffer (sharing storage) that contains the leftover contents of `buffer` that I didn't process.
526+ @warn_unused_result
527+ private func processOneRawMessage( inBuffer buffer: UnsafeBufferPointer < UInt8 > ) -> UnsafeBufferPointer< UInt8 > {
527528 let response = readStack. last
529+ let baseAddress = buffer. baseAddress
530+ let bufferLen = buffer. count
528531 if response != nil && bufferLen < 2 {
529- fragBuffer = NSData ( bytes : buffer, length : bufferLen )
530- return
532+ fragBuffer = NSData ( buffer: buffer )
533+ return emptyBuffer
531534 }
532535 if let response = response where response. bytesLeft > 0 {
533536 var len = response. bytesLeft
@@ -537,45 +540,41 @@ public class WebSocket : NSObject, NSStreamDelegate {
537540 extra = 0
538541 }
539542 response. bytesLeft -= len
540- response. buffer? . appendData ( NSData ( bytes: buffer , length: len) )
543+ response. buffer? . appendData ( NSData ( bytes: baseAddress , length: len) )
541544 processResponse ( response)
542- let offset = bufferLen - extra
543- if extra > 0 {
544- processExtra ( ( buffer+ offset) , bufferLen: extra)
545- }
546- return
545+ return buffer. fromOffset ( bufferLen - extra)
547546 } else {
548- let isFin = ( FinMask & buffer [ 0 ] )
549- let receivedOpcode = OpCode ( rawValue: ( OpCodeMask & buffer [ 0 ] ) )
550- let isMasked = ( MaskMask & buffer [ 1 ] )
551- let payloadLen = ( PayloadLenMask & buffer [ 1 ] )
547+ let isFin = ( FinMask & baseAddress [ 0 ] )
548+ let receivedOpcode = OpCode ( rawValue: ( OpCodeMask & baseAddress [ 0 ] ) )
549+ let isMasked = ( MaskMask & baseAddress [ 1 ] )
550+ let payloadLen = ( PayloadLenMask & baseAddress [ 1 ] )
552551 var offset = 2
553- if ( isMasked > 0 || ( RSVMask & buffer [ 0 ] ) > 0 ) && receivedOpcode != . Pong {
552+ if ( isMasked > 0 || ( RSVMask & baseAddress [ 0 ] ) > 0 ) && receivedOpcode != . Pong {
554553 let errCode = CloseCode . ProtocolError. rawValue
555554 doDisconnect ( errorWithDetail ( " masked and rsv data is not currently supported " , code: errCode) )
556555 writeError ( errCode)
557- return
556+ return emptyBuffer
558557 }
559558 let isControlFrame = ( receivedOpcode == . ConnectionClose || receivedOpcode == . Ping)
560559 if !isControlFrame && ( receivedOpcode != . BinaryFrame && receivedOpcode != . ContinueFrame &&
561560 receivedOpcode != . TextFrame && receivedOpcode != . Pong) {
562561 let errCode = CloseCode . ProtocolError. rawValue
563562 doDisconnect ( errorWithDetail ( " unknown opcode: \( receivedOpcode) " , code: errCode) )
564563 writeError ( errCode)
565- return
564+ return emptyBuffer
566565 }
567566 if isControlFrame && isFin == 0 {
568567 let errCode = CloseCode . ProtocolError. rawValue
569568 doDisconnect ( errorWithDetail ( " control frames can't be fragmented " , code: errCode) )
570569 writeError ( errCode)
571- return
570+ return emptyBuffer
572571 }
573572 if receivedOpcode == . ConnectionClose {
574573 var code = CloseCode . Normal. rawValue
575574 if payloadLen == 1 {
576575 code = CloseCode . ProtocolError. rawValue
577576 } else if payloadLen > 1 {
578- code = WebSocket . readUint16 ( buffer , offset: offset)
577+ code = WebSocket . readUint16 ( baseAddress , offset: offset)
579578 if code < 1000 || ( code > 1003 && code < 1007 ) || ( code > 1011 && code < 3000 ) {
580579 code = CloseCode . ProtocolError. rawValue
581580 }
@@ -584,7 +583,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
584583 if payloadLen > 2 {
585584 let len = Int ( payloadLen- 2 )
586585 if len > 0 {
587- let bytes = UnsafePointer < UInt8 > ( ( buffer + offset) )
586+ let bytes = baseAddress + offset
588587 let str : NSString ? = NSString ( data: NSData ( bytes: bytes, length: len) , encoding: NSUTF8StringEncoding)
589588 if str == nil {
590589 code = CloseCode . ProtocolError. rawValue
@@ -593,23 +592,23 @@ public class WebSocket : NSObject, NSStreamDelegate {
593592 }
594593 doDisconnect ( errorWithDetail ( " connection closed by server " , code: code) )
595594 writeError ( code)
596- return
595+ return emptyBuffer
597596 }
598597 if isControlFrame && payloadLen > 125 {
599598 writeError ( CloseCode . ProtocolError. rawValue)
600- return
599+ return emptyBuffer
601600 }
602601 var dataLength = UInt64 ( payloadLen)
603602 if dataLength == 127 {
604- dataLength = WebSocket . readUint64 ( buffer , offset: offset)
603+ dataLength = WebSocket . readUint64 ( baseAddress , offset: offset)
605604 offset += sizeof ( UInt64)
606605 } else if dataLength == 126 {
607- dataLength = UInt64 ( WebSocket . readUint16 ( buffer , offset: offset) )
606+ dataLength = UInt64 ( WebSocket . readUint16 ( baseAddress , offset: offset) )
608607 offset += sizeof ( UInt16)
609608 }
610609 if bufferLen < offset || UInt64 ( bufferLen - offset) < dataLength {
611- fragBuffer = NSData ( bytes: buffer , length: bufferLen)
612- return
610+ fragBuffer = NSData ( bytes: baseAddress , length: bufferLen)
611+ return emptyBuffer
613612 }
614613 var len = dataLength
615614 if dataLength > UInt64 ( bufferLen) {
@@ -620,7 +619,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
620619 len = 0
621620 data = NSData ( )
622621 } else {
623- data = NSData ( bytes: UnsafePointer < UInt8 > ( ( buffer + offset) ) , length: Int ( len) )
622+ data = NSData ( bytes: baseAddress + offset, length: Int ( len) )
624623 }
625624 if receivedOpcode == . Pong {
626625 if canDispatch {
@@ -630,12 +629,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
630629 s. pongDelegate? . websocketDidReceivePong ( s)
631630 }
632631 }
633- let step = Int ( offset+ numericCast( len) )
634- let extra = bufferLen- step
635- if extra > 0 {
636- processRawMessage ( ( buffer+ step) , bufferLen: extra)
637- }
638- return
632+ return buffer. fromOffset ( offset + Int( len) )
639633 }
640634 var response = readStack. last
641635 if isControlFrame {
@@ -645,7 +639,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
645639 let errCode = CloseCode . ProtocolError. rawValue
646640 doDisconnect ( errorWithDetail ( " continue frame before a binary or text frame " , code: errCode) )
647641 writeError ( errCode)
648- return
642+ return emptyBuffer
649643 }
650644 var isNew = false
651645 if response == nil {
@@ -654,7 +648,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
654648 doDisconnect ( errorWithDetail ( " first frame can't be a continue frame " ,
655649 code: errCode) )
656650 writeError ( errCode)
657- return
651+ return emptyBuffer
658652 }
659653 isNew = true
660654 response = WSResponse ( )
@@ -669,7 +663,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
669663 doDisconnect ( errorWithDetail ( " second and beyond of fragment message must be a continue frame " ,
670664 code: errCode) )
671665 writeError ( errCode)
672- return
666+ return emptyBuffer
673667 }
674668 response!. buffer!. appendData ( data)
675669 }
@@ -684,20 +678,18 @@ public class WebSocket : NSObject, NSStreamDelegate {
684678 }
685679
686680 let step = Int ( offset+ numericCast( len) )
687- let extra = bufferLen- step
688- if extra > 0 {
689- processExtra ( ( buffer+ step) , bufferLen: extra)
690- }
681+ return buffer. fromOffset ( step)
691682 }
692-
693683 }
694684
695- ///process the extra of a buffer
696- private func processExtra( buffer: UnsafePointer < UInt8 > , bufferLen: Int) {
697- if bufferLen < 2 {
698- fragBuffer = NSData ( bytes: buffer, length: bufferLen)
699- } else {
700- processRawMessage ( buffer, bufferLen: bufferLen)
685+ /// Process all messages in the buffer if possible.
686+ private func processRawMessagesInBuffer( pointer: UnsafePointer < UInt8 > , bufferLen: Int) {
687+ var buffer = UnsafeBufferPointer ( start: pointer, count: bufferLen)
688+ repeat {
689+ buffer = processOneRawMessage ( inBuffer: buffer)
690+ } while buffer. count >= 2
691+ if buffer. count > 0 {
692+ fragBuffer = NSData ( buffer: buffer)
701693 }
702694 }
703695
@@ -835,6 +827,25 @@ public class WebSocket : NSObject, NSStreamDelegate {
835827
836828}
837829
830+ private extension NSData {
831+
832+ convenience init( buffer: UnsafeBufferPointer < UInt8 > ) {
833+ self . init ( bytes: buffer. baseAddress, length: buffer. count)
834+ }
835+
836+ }
837+
838+ private extension UnsafeBufferPointer {
839+
840+ func fromOffset( offset: Int ) -> UnsafeBufferPointer < Element > {
841+ return UnsafeBufferPointer < Element > ( start: baseAddress. advancedBy ( offset) , count: count - offset)
842+ }
843+
844+ }
845+
846+ private let emptyBuffer = UnsafeBufferPointer < UInt8 > ( start: nil , count: 0 )
847+
848+
838849public class SSLCert {
839850 var certData : NSData ?
840851 var key : SecKeyRef ?
0 commit comments