View Javadoc

1   //========================================================================
2   //Copyright 2004-2008 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //Licensed under the Apache License, Version 2.0 (the "License");
5   //you may not use this file except in compliance with the License.
6   //You may obtain a copy of the License at 
7   //http://www.apache.org/licenses/LICENSE-2.0
8   //Unless required by applicable law or agreed to in writing, software
9   //distributed under the License is distributed on an "AS IS" BASIS,
10  //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  //See the License for the specific language governing permissions and
12  //limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.io.nio;
16  
17  import java.io.IOException;
18  import java.nio.channels.CancelledKeyException;
19  import java.nio.channels.ClosedSelectorException;
20  import java.nio.channels.SelectableChannel;
21  import java.nio.channels.SelectionKey;
22  import java.nio.channels.Selector;
23  import java.nio.channels.ServerSocketChannel;
24  import java.nio.channels.SocketChannel;
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  import java.util.List;
28  
29  import org.mortbay.component.AbstractLifeCycle;
30  import org.mortbay.io.Connection;
31  import org.mortbay.io.EndPoint;
32  import org.mortbay.log.Log;
33  import org.mortbay.thread.Timeout;
34  
35  
36  /* ------------------------------------------------------------ */
37  /**
38   * The Selector Manager manages and number of SelectSets to allow
39   * NIO scheduling to scale to large numbers of connections.
40   * 
41   * @author gregw
42   *
43   */
44  public abstract class SelectorManager extends AbstractLifeCycle
45  {
46      // TODO Tune these by approx system speed.
47      private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",512).intValue();
48      private static final int __MONITOR_PERIOD=Integer.getInteger("org.mortbay.io.nio.MONITOR_PERIOD",1000).intValue();
49      private static final int __MAX_SELECTS=Integer.getInteger("org.mortbay.io.nio.MAX_SELECTS",15000).intValue();
50      private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
51      private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
52      
53      private boolean _delaySelectKeyUpdate=true;
54      private long _maxIdleTime;
55      private long _lowResourcesConnections;
56      private long _lowResourcesMaxIdleTime;
57      private transient SelectSet[] _selectSet;
58      private int _selectSets=1;
59      private volatile int _set;
60      
61      /* ------------------------------------------------------------ */
62      /**
63       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
64       * @see {@link #setLowResourcesMaxIdleTime(long)}
65       */
66      public void setMaxIdleTime(long maxIdleTime)
67      {
68          _maxIdleTime=maxIdleTime;
69      }
70      
71      /* ------------------------------------------------------------ */
72      /**
73       * @param selectSets
74       */
75      public void setSelectSets(int selectSets)
76      {
77          long lrc = _lowResourcesConnections * _selectSets; 
78          _selectSets=selectSets;
79          _lowResourcesConnections=lrc/_selectSets;
80      }
81      
82      /* ------------------------------------------------------------ */
83      /**
84       * @return
85       */
86      public long getMaxIdleTime()
87      {
88          return _maxIdleTime;
89      }
90      
91      /* ------------------------------------------------------------ */
92      /**
93       * @return
94       */
95      public int getSelectSets()
96      {
97          return _selectSets;
98      }
99      
100     /* ------------------------------------------------------------ */
101     /**
102      * @return
103      */
104     public boolean isDelaySelectKeyUpdate()
105     {
106         return _delaySelectKeyUpdate;
107     }
108 
109     /* ------------------------------------------------------------ */
110     /** Register a channel
111      * @param channel
112      * @param att Attached Object
113      * @throws IOException
114      */
115     public void register(SocketChannel channel, Object att) throws IOException
116     {
117         int s=_set++; 
118         s=s%_selectSets;
119         SelectSet[] sets=_selectSet;
120         if (sets!=null)
121         {
122             SelectSet set=sets[s];
123             set.addChange(channel,att);
124             set.wakeup();
125         }
126     }
127     
128     /* ------------------------------------------------------------ */
129     /** Register a serverchannel
130      * @param acceptChannel
131      * @return
132      * @throws IOException
133      */
134     public void register(ServerSocketChannel acceptChannel) throws IOException
135     {
136         int s=_set++; 
137         s=s%_selectSets;
138         SelectSet set=_selectSet[s];
139         set.addChange(acceptChannel);
140         set.wakeup();
141     }
142 
143     /* ------------------------------------------------------------ */
144     /**
145      * @return the lowResourcesConnections
146      */
147     public long getLowResourcesConnections()
148     {
149         return _lowResourcesConnections*_selectSets;
150     }
151 
152     /* ------------------------------------------------------------ */
153     /**
154      * Set the number of connections, which if exceeded places this manager in low resources state.
155      * This is not an exact measure as the connection count is averaged over the select sets.
156      * @param lowResourcesConnections the number of connections
157      * @see {@link #setLowResourcesMaxIdleTime(long)}
158      */
159     public void setLowResourcesConnections(long lowResourcesConnections)
160     {
161         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
162     }
163 
164     /* ------------------------------------------------------------ */
165     /**
166      * @return the lowResourcesMaxIdleTime
167      */
168     public long getLowResourcesMaxIdleTime()
169     {
170         return _lowResourcesMaxIdleTime;
171     }
172 
173     /* ------------------------------------------------------------ */
174     /**
175      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
176      * @see {@link #setMaxIdleTime(long)}
177      */
178     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
179     {
180         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
181     }
182     
183     /* ------------------------------------------------------------ */
184     /**
185      * @param acceptorID
186      * @throws IOException
187      */
188     public void doSelect(int acceptorID) throws IOException
189     {
190         SelectSet[] sets= _selectSet;
191         if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
192             sets[acceptorID].doSelect();
193     }
194 
195 
196     /* ------------------------------------------------------------ */
197     /**
198      * @param delaySelectKeyUpdate
199      */
200     public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)
201     {
202         _delaySelectKeyUpdate=delaySelectKeyUpdate;
203     }
204 
205     /* ------------------------------------------------------------ */
206     /**
207      * @param key
208      * @return
209      * @throws IOException 
210      */
211     protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
212 
213     /* ------------------------------------------------------------------------------- */
214     public abstract boolean dispatch(Runnable task) throws IOException;
215 
216     /* ------------------------------------------------------------ */
217     /* (non-Javadoc)
218      * @see org.mortbay.component.AbstractLifeCycle#doStart()
219      */
220     protected void doStart() throws Exception
221     {
222         _selectSet = new SelectSet[_selectSets];
223         for (int i=0;i<_selectSet.length;i++)
224             _selectSet[i]= new SelectSet(i);
225 
226         super.doStart();
227     }
228 
229 
230     /* ------------------------------------------------------------------------------- */
231     protected void doStop() throws Exception
232     {
233         SelectSet[] sets= _selectSet;
234         _selectSet=null;
235         if (sets!=null)
236             for (int i=0;i<sets.length;i++)
237             {
238                 SelectSet set = sets[i];
239                 if (set!=null)
240                     set.stop();
241             }
242         super.doStop();
243     }
244 
245     /* ------------------------------------------------------------ */
246     /**
247      * @param endpoint
248      */
249     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
250 
251     /* ------------------------------------------------------------ */
252     /**
253      * @param endpoint
254      */
255     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
256 
257     /* ------------------------------------------------------------------------------- */
258     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
259 
260     /* ------------------------------------------------------------ */
261     /**
262      * @param channel
263      * @param selectSet
264      * @param sKey
265      * @return
266      * @throws IOException
267      */
268     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
269 
270     /* ------------------------------------------------------------------------------- */
271     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
272     {
273         Log.warn(ex);
274     }
275     
276     /* ------------------------------------------------------------------------------- */
277     /* ------------------------------------------------------------------------------- */
278     /* ------------------------------------------------------------------------------- */
279     public class SelectSet 
280     {
281         private transient int _change;
282         private transient List[] _changes;
283         private transient Timeout _idleTimeout;
284         private transient int _nextSet;
285         private transient Timeout _retryTimeout;
286         private transient Selector _selector;
287         private transient int _setID;
288         private volatile boolean _selecting;
289         private transient int _jvmBug;
290         private int _selects;
291         private long _monitorStart;
292         private long _monitorNext;
293         private boolean _pausing;
294         private SelectionKey _busyKey;
295         private int _busyKeyCount;
296         private long _log;
297         private int _paused;
298         private int _jvmFix0;
299         private int _jvmFix1;
300         private int _jvmFix2;
301         
302         /* ------------------------------------------------------------ */
303         SelectSet(int acceptorID) throws Exception
304         {
305             _setID=acceptorID;
306 
307             _idleTimeout = new Timeout(this);
308             _idleTimeout.setDuration(getMaxIdleTime());
309             _retryTimeout = new Timeout(this);
310             _retryTimeout.setDuration(0L);
311 
312             // create a selector;
313             _selector = Selector.open();
314             _changes = new ArrayList[] {new ArrayList(),new ArrayList()};
315             _change=0;
316             _monitorStart=System.currentTimeMillis();
317             _monitorNext=_monitorStart+__MONITOR_PERIOD;
318             _log=_monitorStart+60000;
319         }
320         
321         /* ------------------------------------------------------------ */
322         public void addChange(Object point)
323         {
324             synchronized (_changes)
325             {
326                 _changes[_change].add(point);
327             }
328         }
329         
330         /* ------------------------------------------------------------ */
331         public void addChange(SelectableChannel channel, Object att)
332         {   
333             if (att==null)
334                 addChange(channel);
335             else if (att instanceof EndPoint)
336                 addChange(att);
337             else
338                 addChange(new ChangeSelectableChannel(channel,att));
339         }
340         
341         /* ------------------------------------------------------------ */
342         public void cancelIdle(Timeout.Task task)
343         {
344             synchronized (this)
345             {
346                 task.cancel();
347             }
348         }
349 
350         /* ------------------------------------------------------------ */
351         /**
352          * Select and dispatch tasks found from changes and the selector.
353          * 
354          * @throws IOException
355          */
356         public void doSelect() throws IOException
357         {
358             SelectionKey key=null;
359             
360             try
361             {
362                 List changes;
363                 final Selector selector;
364                 synchronized (_changes)
365                 {
366                     changes=_changes[_change];
367                     _change=_change==0?1:0;
368                     _selecting=true;
369                     selector=_selector;
370                 }
371 
372                 // Make any key changes required
373                 for (int i = 0; i < changes.size(); i++)
374                 {
375                     try
376                     {
377                         Object o = changes.get(i);
378                         
379                         if (o instanceof EndPoint)
380                         {
381                             // Update the operations for a key.
382                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
383                             endpoint.doUpdateKey();
384                         }
385                         else if (o instanceof Runnable)
386                         {
387                             dispatch((Runnable)o);
388                         }
389                         else if (o instanceof ChangeSelectableChannel)
390                         {
391                             // finish accepting/connecting this connection
392                             final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
393                             final SelectableChannel channel=asc._channel;
394                             final Object att = asc._attachment;
395 
396                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
397                             {
398                                 key = channel.register(selector,SelectionKey.OP_READ,att);
399                                 SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
400                                 key.attach(endpoint);
401                                 endpoint.dispatch();
402                             }
403                             else if (channel.isOpen())
404                             {
405                                 channel.register(selector,SelectionKey.OP_CONNECT,att);
406                             }
407                         }
408                         else if (o instanceof SocketChannel)
409                         {
410                             final SocketChannel channel=(SocketChannel)o;
411 
412                             if (channel.isConnected())
413                             {
414                                 key = channel.register(selector,SelectionKey.OP_READ,null);
415                                 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
416                                 key.attach(endpoint);
417                                 endpoint.dispatch();
418                             }
419                             else if (channel.isOpen())
420                             {
421                                 channel.register(selector,SelectionKey.OP_CONNECT,null);
422                             }
423                         }
424                         else if (o instanceof ServerSocketChannel)
425                         {
426                             ServerSocketChannel channel = (ServerSocketChannel)o;
427                             channel.register(getSelector(),SelectionKey.OP_ACCEPT);
428                         }
429                         else if (o instanceof ChangeTask)
430                         {
431                             ((ChangeTask)o).run();
432                         }
433                         else
434                             throw new IllegalArgumentException(o.toString());
435                     }
436                     catch (Exception e)
437                     {
438                         if (isRunning())
439                             Log.warn(e);
440                         else
441                             Log.debug(e);
442                     }
443                 }
444                 changes.clear();
445 
446                 long idle_next = 0;
447                 long retry_next = 0;
448                 long now=System.currentTimeMillis();
449                 synchronized (this)
450                 {
451                     _idleTimeout.setNow(now);
452                     _retryTimeout.setNow(now);
453                     if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
454                         _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
455                     else 
456                         _idleTimeout.setDuration(_maxIdleTime);
457                     idle_next=_idleTimeout.getTimeToNext();
458                     retry_next=_retryTimeout.getTimeToNext();
459                 }
460 
461                 // workout how low to wait in select
462                 long wait = 1000L;  // not getMaxIdleTime() as the now value of the idle timers needs to be updated.
463                 if (idle_next >= 0 && wait > idle_next)
464                     wait = idle_next;
465                 if (wait > 0 && retry_next >= 0 && wait > retry_next)
466                     wait = retry_next;
467     
468                 // Do the select.
469                 if (wait > 2) // TODO tune or configure this
470                 {
471                     // If we are in pausing mode
472                     if (_pausing)
473                     {
474                         try
475                         {
476                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
477                         }
478                         catch(InterruptedException e)
479                         {
480                             Log.ignore(e);
481                         }
482                     }
483                         
484                     long before=now;
485                     int selected=selector.select(wait);
486                     now = System.currentTimeMillis();
487                     _idleTimeout.setNow(now);
488                     _retryTimeout.setNow(now);
489                     _selects++;
490 
491                     // Look for JVM bugs over a monitor period.
492                     // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
493                     // http://bugs.sun.com/view_bug.do?bug_id=6693490
494                     if (now>_monitorNext)
495                     {
496                         _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
497                         _pausing=_selects>__MAX_SELECTS;
498                         if (_pausing)
499                             _paused++;
500                             
501                         _selects=0;
502                         _jvmBug=0;
503                         _monitorStart=now;
504                         _monitorNext=now+__MONITOR_PERIOD;
505                     }
506                     
507                     if (now>_log)
508                     {
509                         if (_paused>0)  
510                             Log.info(this+" Busy selector - injecting delay "+_paused+" times");
511 
512                         if (_jvmFix2>0)
513                             Log.info(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
514 
515                         if (_jvmFix1>0)
516                             Log.info(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, canceled keys "+_jvmFix0+" times");
517 
518                         else if(Log.isDebugEnabled() && _jvmFix0>0)
519                             Log.info(this+" JVM BUG(s) - canceled keys "+_jvmFix0+" times");
520                         _paused=0;
521                         _jvmFix2=0;
522                         _jvmFix1=0;
523                         _jvmFix0=0;
524                         _log=now+60000;
525                     }
526                     
527                     // If we see signature of possible JVM bug, increment count.
528                     if (selected==0 && wait>10 && (now-before)<(wait/2))
529                     {
530                         // Increment bug count and try a work around
531                         _jvmBug++;
532                         if (_jvmBug>(__JVMBUG_THRESHHOLD))
533                         {
534                             try
535                             {
536                                 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
537                                     _jvmFix2++;
538                                     
539                                 Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
540                             }
541                             catch(InterruptedException e)
542                             {
543                                 Log.ignore(e);
544                             }
545                         }
546                         else if (_jvmBug==__JVMBUG_THRESHHOLD)
547                         {
548                             synchronized (this)
549                             {
550                                 // BLOODY SUN BUG !!!  Try refreshing the entire selector.
551                                 _jvmFix1++;
552                                 
553                                 final Selector new_selector = Selector.open();
554                                 Iterator iterator = _selector.keys().iterator();
555                                 while (iterator.hasNext())
556                                 {
557                                     SelectionKey k = (SelectionKey)iterator.next();
558                                     if (!k.isValid() || k.interestOps()==0)
559                                         continue;
560                                     
561                                     final SelectableChannel channel = k.channel();
562                                     final Object attachment = k.attachment();
563                                     
564                                     if (attachment==null)
565                                         addChange(channel);
566                                     else
567                                         addChange(channel,attachment);
568                                 }
569                                 Selector old_selector=_selector;
570                                 _selector=new_selector;
571                                 try 
572                                 {
573                                     old_selector.close();
574                                 }
575                                 catch(Exception e)
576                                 {
577                                     Log.warn(e);
578                                 }
579                                 return;
580                             }
581                         }
582                         else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
583                         {
584                             // Cancel keys with 0 interested ops
585                             int cancelled=0;
586                             Iterator iter = selector.keys().iterator();
587                             while(iter.hasNext())
588                             {
589                                 SelectionKey k = (SelectionKey) iter.next();
590                                 if (k.isValid()&&k.interestOps()==0)
591                                 {
592                                     k.cancel();
593                                     cancelled++;
594                                 }
595                             }
596                             if (cancelled>0)
597                                 _jvmFix0++;
598                             
599                             return;
600                         }
601                     }
602                     else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
603                     {
604                         // Look for busy key
605                         SelectionKey busy = (SelectionKey)selector.selectedKeys().iterator().next();
606                         if (busy==_busyKey)
607                         {
608                             if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
609                             {
610                                 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
611                                 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
612                                 busy.cancel();
613                                 if (endpoint!=null)
614                                 {
615                                     dispatch(new Runnable()
616                                     {
617                                         public void run()
618                                         {
619                                             try
620                                             {
621                                                 endpoint.close();
622                                             }
623                                             catch (IOException e)
624                                             {
625                                                 Log.ignore(e);
626                                             }
627                                         }
628                                     });
629                                 }
630                             }
631                         }
632                         else
633                             _busyKeyCount=0;
634                         _busyKey=busy;
635                     }
636                 }
637                 else 
638                 {
639                     selector.selectNow();
640                     _selects++;
641                 }
642 
643                 // have we been destroyed while sleeping
644                 if (_selector==null || !selector.isOpen())
645                     return;
646 
647                 // Look for things to do
648                 Iterator iter = selector.selectedKeys().iterator();
649                 while (iter.hasNext())
650                 {
651                     key = (SelectionKey) iter.next();
652                                         
653                     try
654                     {
655                         if (!key.isValid())
656                         {
657                             key.cancel();
658                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
659                             if (endpoint != null)
660                                 endpoint.doUpdateKey();
661                             continue;
662                         }
663                         
664                         Object att = key.attachment();
665                         
666                         if (att instanceof SelectChannelEndPoint)
667                         {
668                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att;
669                             endpoint.dispatch();
670                         }
671                         else if (key.isAcceptable())
672                         {
673                             SocketChannel channel = acceptChannel(key);
674                             if (channel==null)
675                                 continue;
676 
677                             channel.configureBlocking(false);
678 
679                             // TODO make it reluctant to leave 0
680                             _nextSet=++_nextSet%_selectSet.length;
681 
682                             // Is this for this selectset
683                             if (_nextSet==_setID)
684                             {
685                                 // bind connections to this select set.
686                                 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
687                                 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
688                                 cKey.attach(endpoint);
689                                 if (endpoint != null)
690                                     endpoint.dispatch();
691                             }
692                             else
693                             {
694                                 // nope - give it to another.
695                                 _selectSet[_nextSet].addChange(channel);
696                                 _selectSet[_nextSet].wakeup();
697                             }
698                         }
699                         else if (key.isConnectable())
700                         {
701                             // Complete a connection of a registered channel
702                             SocketChannel channel = (SocketChannel)key.channel();
703                             boolean connected=false;
704                             try
705                             {
706                                 connected=channel.finishConnect();
707                             }
708                             catch(Exception e)
709                             {
710                                 connectionFailed(channel,e,att);
711                             }
712                             finally
713                             {
714                                 if (connected)
715                                 {
716                                     key.interestOps(SelectionKey.OP_READ);
717                                     SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
718                                     key.attach(endpoint);
719                                     endpoint.dispatch();
720                                 }
721                                 else
722                                 {
723                                     key.cancel();
724                                 }
725                             }
726                         }
727                         else
728                         {
729                             // Wrap readable registered channel in an endpoint
730                             SocketChannel channel = (SocketChannel)key.channel();
731                             SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
732                             key.attach(endpoint);
733                             if (key.isReadable())
734                                 endpoint.dispatch();                           
735                         }
736                         key = null;
737                     }
738                     catch (CancelledKeyException e)
739                     {
740                         Log.ignore(e);
741                     }
742                     catch (Exception e)
743                     {
744                         if (isRunning())
745                             Log.warn(e);
746                         else
747                             Log.ignore(e);
748 
749                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
750                         {
751                             key.interestOps(0);
752 
753                             key.cancel();
754                         } 
755                     }
756                 }
757                 
758                 // Everything always handled
759                 selector.selectedKeys().clear();
760 
761                 // tick over the timers
762                 _idleTimeout.tick(now);
763                 _retryTimeout.tick(now);
764                 
765             }
766             catch (ClosedSelectorException e)
767             {
768                 Log.warn(e);
769             }
770             catch (CancelledKeyException e)
771             {
772                 Log.ignore(e);
773             }
774             finally
775             {
776                 _selecting=false;
777             }
778         }
779 
780         /* ------------------------------------------------------------ */
781         public SelectorManager getManager()
782         {
783             return SelectorManager.this;
784         }
785 
786         /* ------------------------------------------------------------ */
787         public long getNow()
788         {
789             return _idleTimeout.getNow();
790         }
791         
792         /* ------------------------------------------------------------ */
793         public void scheduleIdle(Timeout.Task task)
794         {
795             synchronized (this)
796             {
797                 if (_idleTimeout.getDuration() <= 0)
798                     return;
799                 
800                 task.schedule(_idleTimeout);
801             }
802         }
803 
804         /* ------------------------------------------------------------ */
805         public void scheduleTimeout(Timeout.Task task, long timeout)
806         {
807             synchronized (this)
808             {
809                 _retryTimeout.schedule(task, timeout);
810             }
811         }
812 
813         /* ------------------------------------------------------------ */
814         public void wakeup()
815         {
816             Selector selector = _selector;
817             if (selector!=null)
818                 selector.wakeup();
819         }
820 
821         /* ------------------------------------------------------------ */
822         Selector getSelector()
823         {
824             return _selector;
825         }
826         
827         /* ------------------------------------------------------------ */
828         void stop() throws Exception
829         {
830             boolean selecting=true;
831             while(selecting)
832             {
833                 wakeup();
834                 selecting=_selecting;
835             }
836             
837             ArrayList keys=new ArrayList(_selector.keys());
838             Iterator iter =keys.iterator();
839 
840             while (iter.hasNext())
841             {
842                 SelectionKey key = (SelectionKey)iter.next();
843                 if (key==null)
844                     continue;
845                 Object att=key.attachment();
846                 if (att instanceof EndPoint)
847                 {
848                     EndPoint endpoint = (EndPoint)att;
849                     try
850                     {
851                         endpoint.close();
852                     }
853                     catch(IOException e)
854                     {
855                         Log.ignore(e);
856                     }
857                 }
858             }
859             
860             synchronized (this)
861             {
862                 selecting=_selecting;
863                 while(selecting)
864                 {
865                     wakeup();
866                     selecting=_selecting;
867                 }
868                 
869                 _idleTimeout.cancelAll();
870                 _retryTimeout.cancelAll();
871                 try
872                 {
873                     if (_selector != null)
874                         _selector.close();
875                 }
876                 catch (IOException e)
877                 {
878                     Log.ignore(e);
879                 } 
880                 _selector=null;
881             }
882         }
883     }
884 
885     /* ------------------------------------------------------------ */
886     private static class ChangeSelectableChannel
887     {
888         final SelectableChannel _channel;
889         final Object _attachment;
890         
891         public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
892         {
893             super();
894             _channel = channel;
895             _attachment = attachment;
896         }
897     }
898 
899     /* ------------------------------------------------------------ */
900     private interface ChangeTask
901     {
902         public void run();
903     }
904 }