Class Jabber::Bytestreams::IBB
In: lib/xmpp4r/bytestreams/helper/ibb/base.rb
Parent: Object

In-Band Bytestreams (JEP-0047) implementation

Don‘t use directly, use IBBInitiator and IBBTarget

In-Band Bytestreams should only be used when transferring very small amounts of binary data, because it is slow and increases server load drastically.

Note that the constructor takes a lot of arguments. In-Band Bytestreams do not specify a way to initiate the stream, this should be done via Stream Initiation.

Methods

activate   active?   close   deactivate   flush   new   read   send_data   write  

Constants

NS_IBB = 'http://jabber.org/protocol/ibb'

Public Class methods

Create a new bytestream

Will register a <message/> callback to intercept data of this stream. This data will be buffered, you can retrieve it with receive

[Source]

    # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 30
30:       def initialize(stream, session_id, my_jid, peer_jid)
31:         @stream = stream
32:         @session_id = session_id
33:         @my_jid = (my_jid.kind_of?(String) ? JID.new(my_jid) : my_jid)
34:         @peer_jid = (peer_jid.kind_of?(String) ? JID.new(peer_jid) : peer_jid)
35: 
36:         @active = false
37:         @seq_send = 0
38:         @seq_recv = 0
39:         @queue = []
40:         @queue_lock = Mutex.new
41:         @pending = Mutex.new
42:         @pending.lock
43:         @sendbuf = ''
44:         @sendbuf_lock = Mutex.new
45: 
46:         @block_size = 4096  # Recommended by JEP0047
47:       end

Public Instance methods

[Source]

    # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 49
49:       def active?
50:         @active
51:       end

Close the stream

Waits for acknowledge from peer, may throw ErrorException

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 129
129:       def close
130:         if active?
131:           flush
132:           deactivate
133: 
134:           iq = Iq.new(:set, @peer_jid)
135:           close = iq.add REXML::Element.new('close')
136:           close.add_namespace IBB::NS_IBB
137:           close.attributes['sid'] = @session_id
138: 
139:           @stream.send_with_id(iq) { |answer|
140:             answer.type == :result
141:           }
142:         end
143:       end

Empty the send-buffer by sending remaining data

[Source]

    # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 73
73:       def flush
74:         @sendbuf_lock.synchronize {
75:           while @sendbuf.size > 0
76:             send_data(@sendbuf[0..@block_size-1])
77:             @sendbuf = @sendbuf[@block_size..-1].to_s
78:           end
79:         }
80:       end

Receive data

Will wait until the Message with the next sequence number is in the stanza queue.

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 87
 87:       def read
 88:         if active?
 89:           res = nil
 90: 
 91:           while res.nil?
 92:             @queue_lock.synchronize {
 93:               @queue.each { |item|
 94:                 # Find next data
 95:                 if item.type == :data and item.seq == @seq_recv.to_s
 96:                   res = item
 97:                   break
 98:                 # No data? Find close
 99:                 elsif item.type == :close and res.nil?
100:                   res = item
101:                 end
102:               }
103: 
104:               @queue.delete_if { |item| item == res }
105:             }
106: 
107:             # No data? Wait for next to arrive...
108:             @pending.lock unless res
109:           end
110: 
111:           if res.type == :data
112:             @seq_recv += 1
113:             @seq_recv = 0 if @seq_recv > 65535
114:             res.data
115:           elsif res.type == :close
116:             deactivate
117:             nil # Closed
118:           end
119:         else
120:           nil
121:         end
122:       end

Send data

Data is buffered to match block_size in each packet. If you need the data to be sent immediately, use flush afterwards.

buf:[String]

[Source]

    # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 60
60:       def write(buf)
61:         @sendbuf_lock.synchronize {
62:           @sendbuf += buf
63: 
64:           while @sendbuf.size >= @block_size
65:             send_data(@sendbuf[0..@block_size-1])
66:             @sendbuf = @sendbuf[@block_size..-1].to_s
67:           end
68:         }
69:       end

Private Instance methods

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 183
183:       def activate
184:         unless active?
185:           @stream.add_message_callback(200, self) { |msg|
186:             data = msg.first_element('data')
187:             if msg.from == @peer_jid and msg.to == @my_jid and data and data.attributes['sid'] == @session_id
188:               if msg.type == nil
189:                 @queue_lock.synchronize {
190:                   @queue.push IBBQueueItem.new(:data, data.attributes['seq'], data.text.to_s)
191:                   @pending.unlock
192:                 }
193:               elsif msg.type == :error
194:                 @queue_lock.synchronize {
195:                   @queue << IBBQueueItem.new(:close)
196:                   @pending.unlock
197:                 }
198:               end
199:               true
200:             else
201:               false
202:             end
203:           }
204: 
205:           @stream.add_iq_callback(200, self) { |iq|
206:             close = iq.first_element('close')
207:             if iq.type == :set and close and close.attributes['sid'] == @session_id
208:               answer = iq.answer(false)
209:               answer.type = :result
210:               @stream.send(answer)
211: 
212:               @queue_lock.synchronize {
213:                 @queue << IBBQueueItem.new(:close)
214:                 @pending.unlock
215:               }
216:               true
217:             else
218:               false
219:             end
220:           }
221: 
222:           @active = true
223:         end
224:       end

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 226
226:       def deactivate
227:         if active?
228:           @stream.delete_message_callback(self)
229:           @stream.delete_iq_callback(self)
230: 
231:           @active = false
232:         end
233:       end

Send data directly

data:[String]

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 150
150:       def send_data(databuf)
151:         if active?
152:           msg = Message.new
153:           msg.from = @my_jid
154:           msg.to = @peer_jid
155:           
156:           data = msg.add REXML::Element.new('data')
157:           data.add_namespace NS_IBB
158:           data.attributes['sid'] = @session_id
159:           data.attributes['seq'] = @seq_send.to_s
160:           data.text = Base64::encode64 databuf
161: 
162:           # TODO: Implement AMP correctly
163:           amp = msg.add REXML::Element.new('amp')
164:           amp.add_namespace 'http://jabber.org/protocol/amp'
165:           deliver_at = amp.add REXML::Element.new('rule')
166:           deliver_at.attributes['condition'] = 'deliver-at'
167:           deliver_at.attributes['value'] = 'stored'
168:           deliver_at.attributes['action'] = 'error'
169:           match_resource = amp.add REXML::Element.new('rule')
170:           match_resource.attributes['condition'] = 'match-resource'
171:           match_resource.attributes['value'] = 'exact'
172:           match_resource.attributes['action'] = 'error'
173:    
174:           @stream.send(msg)
175: 
176:           @seq_send += 1
177:           @seq_send = 0 if @seq_send > 65535
178:         else
179:           raise 'Attempt to send data when not activated'
180:         end
181:       end

[Validate]