Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/eps/src/main/java/com/icesoft/push/servlet/ICEpushServlet.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/eps/src/main/java/com/icesoft/push/servlet/ICEpushServlet.java (revision 30958) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/eps/src/main/java/com/icesoft/push/servlet/ICEpushServlet.java (revision ) @@ -17,8 +17,10 @@ */ package com.icesoft.push.servlet; +import com.icesoft.icepush.MainServlet; import com.icesoft.net.messaging.MessageServiceClient; import com.icesoft.net.messaging.jms.JMSAdapter; +import com.icesoft.push.LocalPushGroupManager; import com.icesoft.push.ProductInfo; import com.icesoft.push.messaging.PushMessageService; import com.icesoft.util.ThreadFactory; @@ -35,13 +37,12 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.icepush.Configuration; +import org.icepush.PushContext; import org.icepush.PushGroupAdapter; import org.icepush.PushGroupEvent; -import org.icepush.servlet.MainServlet; -import org.icepush.servlet.ServletContextConfiguration; -import org.icepush.Configuration; import org.icepush.PushGroupManager; -import org.icepush.PushContext; +import org.icepush.servlet.ServletContextConfiguration; public class ICEpushServlet extends HttpServlet { @@ -68,9 +69,6 @@ } super.init(servletConfig); ServletContext _servletContext = servletConfig.getServletContext(); - _servletContext.setAttribute("scanning", Boolean.FALSE); - mainServlet = new MainServlet(_servletContext, false, false); - PushGroupManager _pushGroupManager = mainServlet.getPushGroupManager(); Configuration _configuration = new ServletContextConfiguration("com.icesoft.push", _servletContext); ThreadFactory _threadFactory = new ThreadFactory(); _threadFactory.setPrefix(ProductInfo.PRODUCT + " Thread"); @@ -80,6 +78,9 @@ if (LOGGER.isLoggable(Level.FINE)) { LOGGER.log(Level.FINE, ProductInfo.PRODUCT + " - Thread Pool: " + executor.getCorePoolSize()); } + _servletContext.setAttribute("org.icepush.PushGroupManager", "com.icesoft.push.LocalPushGroupManager"); + mainServlet = new MainServlet(_servletContext, false, false, executor); + PushGroupManager _pushGroupManager = mainServlet.getPushGroupManager(); pushMessageService = new PushMessageService( PushContext.getInstance(_servletContext), @@ -91,6 +92,7 @@ _pushGroupManager); pushMessageService.setUp(); pushMessageService.start(); + ((LocalPushGroupManager)_pushGroupManager).setPushMessageService(pushMessageService); _pushGroupManager.addPushGroupListener( new PushGroupAdapter() { public void pushIDTouched(final PushGroupEvent event) { Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/PushGroupManagerFactory.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/PushGroupManagerFactory.java (revision 29921) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/PushGroupManagerFactory.java (revision ) @@ -20,6 +20,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.logging.Level; import java.util.logging.Logger; @@ -30,8 +31,22 @@ public class PushGroupManagerFactory { private static final Logger LOGGER = Logger.getLogger(PushGroupManagerFactory.class.getName()); - public static PushGroupManager newPushGroupManager(final ServletContext servletContext, final Configuration configuration) { - String _pushGroupManager = configuration.getAttribute("pushGroupManager", null); + public static PushGroupManager newPushGroupManager( + final ServletContext servletContext, final Configuration configuration) { + + return newPushGroupManager(servletContext, null, configuration); + } + + public static PushGroupManager newPushGroupManager( + final ServletContext servletContext, final ScheduledThreadPoolExecutor executor, + final Configuration configuration) { + + String _pushGroupManager = (String)servletContext.getAttribute("org.icepush.PushGroupManager"); + LOGGER.log(Level.INFO, "org.icepush.PushGroupManager [1]: " + _pushGroupManager); + if (_pushGroupManager == null) { + _pushGroupManager = configuration.getAttribute("pushGroupManager", null); + } + LOGGER.log(Level.INFO, "org.icepush.PushGroupManager [1]: " + _pushGroupManager); if (_pushGroupManager == null || _pushGroupManager.trim().length() == 0) { LOGGER.log(Level.FINEST, "Using annotation scanner to find @ExtendedPushGroupManager."); Set _annotationSet = new HashSet(); @@ -42,15 +57,27 @@ // throws IOException for (Class _class : _classes) { try { + if (executor == null) { - return - (PushGroupManager) - _class. - // throws NoSuchMethodException, SecurityException - getConstructor(ServletContext.class). - // throws - // IllegalAccessException, IllegalArgumentException, InstantiationException, - // InvocationTargetException, ExceptionInInitializerError - newInstance(servletContext); + return + (PushGroupManager) + _class. + // throws NoSuchMethodException, SecurityException + getConstructor(ServletContext.class). + // throws + // IllegalAccessException, IllegalArgumentException, InstantiationException, + // InvocationTargetException, ExceptionInInitializerError + newInstance(servletContext); + } else { + return + (PushGroupManager) + _class. + // throws NoSuchMethodException, SecurityException + getConstructor(ServletContext.class, ScheduledThreadPoolExecutor.class). + // throws + // IllegalAccessException, IllegalArgumentException, InstantiationException, + // InvocationTargetException, ExceptionInInitializerError + newInstance(servletContext, executor); + } } catch (NoSuchMethodException exception) { LOGGER.log(Level.FINEST, "Can't get constructor!", exception); // Do nothing. @@ -86,43 +113,55 @@ } else { LOGGER.log(Level.FINEST, "PushGroupManager: " + _pushGroupManager); try { + if (executor == null) { - return - (PushGroupManager) - // throws ClassNotFoundException, ExceptionInInitializerError - Class.forName(_pushGroupManager). - // throws NoSuchMethodException, SecurityException - getConstructor(ServletContext.class). - // throws - // IllegalAccessException, IllegalArgumentException, InstantiationException, - // InvocationTargetException, ExceptionInInitializerError - newInstance(servletContext); + return + (PushGroupManager) + // throws ClassNotFoundException, ExceptionInInitializerError + Class.forName(_pushGroupManager). + // throws NoSuchMethodException, SecurityException + getConstructor(ServletContext.class). + // throws + // IllegalAccessException, IllegalArgumentException, InstantiationException, + // InvocationTargetException, ExceptionInInitializerError + newInstance(servletContext); + } else { + return + (PushGroupManager) + Class.forName(_pushGroupManager). + // throws NoSuchMethodException, SecurityException + getConstructor(ServletContext.class, ScheduledThreadPoolExecutor.class). + // throws + // IllegalAccessException, IllegalArgumentException, InstantiationException, + // InvocationTargetException, ExceptionInInitializerError + newInstance(servletContext, executor); + } } catch (ClassNotFoundException exception) { - LOGGER.log(Level.FINEST, "Can't find class!", exception); + LOGGER.log(Level.INFO, "Can't find class!", exception); // Do nothing. } catch (ExceptionInInitializerError error) { - LOGGER.log(Level.FINEST, "Can't find class or can't create instance!", error); + LOGGER.log(Level.INFO, "Can't find class or can't create instance!", error); // Do nothing. } catch (NoSuchMethodException exception) { - LOGGER.log(Level.FINEST, "Can't get constructor!", exception); + LOGGER.log(Level.INFO, "Can't get constructor!", exception); // Do nothing. } catch (SecurityException exception) { - LOGGER.log(Level.FINEST, "Can't get constructor!", exception); + LOGGER.log(Level.INFO, "Can't get constructor!", exception); // Do nothing. } catch (IllegalAccessException exception) { - LOGGER.log(Level.FINEST, "Can't create instance!", exception); + LOGGER.log(Level.INFO, "Can't create instance!", exception); // Do nothing. } catch (IllegalArgumentException exception) { - LOGGER.log(Level.FINEST, "Can't create instance!", exception); + LOGGER.log(Level.INFO, "Can't create instance!", exception); // Do nothing. } catch (InstantiationException exception) { - LOGGER.log(Level.FINEST, "Can't create instance!", exception); + LOGGER.log(Level.INFO, "Can't create instance!", exception); // Do nothing. } catch (InvocationTargetException exception) { - LOGGER.log(Level.FINEST, "Can't create instance!", exception); + LOGGER.log(Level.INFO, "Can't create instance!", exception); // Do nothing. } } - LOGGER.log(Level.FINEST, "Falling back to LocalPushGroupManager."); + LOGGER.log(Level.INFO, "Falling back to LocalPushGroupManager."); return new LocalPushGroupManager(servletContext); } } Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/PushGroupManager.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/PushGroupManager.java (revision 28010) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/PushGroupManager.java (revision ) @@ -47,4 +47,16 @@ Map getGroupMap(); void shutdown(); + + void recordListen(List pushIDList, int sequenceNumber); + + void startConfirmationTimeout(String[] pushIDs); + + void cancelConfirmationTimeout(String[] pushIDs); + + void startExpiryTimeout(String[] pushIDs); + + void cancelExpiryTimeout(String[] pushIDs); + + void scan(String[] confirmedPushIDs); } Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/servlet/MainServlet.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/servlet/MainServlet.java (revision 29921) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/servlet/MainServlet.java (revision ) @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Timer; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.logging.Level; import java.util.logging.Logger; @@ -52,6 +53,7 @@ public MainServlet(final ServletContext servletContext, final boolean terminateBlockingConnectionOnShutdown) { + this(servletContext,terminateBlockingConnectionOnShutdown,true); } @@ -59,6 +61,14 @@ final boolean terminateBlockingConnectionOnShutdown, final boolean printProductInfo) { + this(servletContext, terminateBlockingConnectionOnShutdown, printProductInfo, null); + } + + public MainServlet(final ServletContext servletContext, + final boolean terminateBlockingConnectionOnShutdown, + final boolean printProductInfo, + final ScheduledThreadPoolExecutor executor) { + //We print the product info unless we are part of EE which will print out it's //own version. if(printProductInfo){ @@ -70,7 +80,7 @@ monitoringScheduler = new Timer("Monitoring scheduler", true); configuration = new ServletContextConfiguration("org.icepush", context); pushContext = new PushContext(context); - pushGroupManager = PushGroupManagerFactory.newPushGroupManager(context, configuration); + pushGroupManager = PushGroupManagerFactory.newPushGroupManager(context, executor, configuration); pushContext.setPushGroupManager(pushGroupManager); dispatcher = new PathDispatcher(); new DefaultOutOfBandNotifier(servletContext); Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/eps/src/main/java/com/icesoft/push/messaging/PushMessageService.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/eps/src/main/java/com/icesoft/push/messaging/PushMessageService.java (revision 30473) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/eps/src/main/java/com/icesoft/push/messaging/PushMessageService.java (revision ) @@ -62,6 +62,7 @@ public static final String CLEAR_PENDING_NOTIFICATIONS_MESSAGE_TYPE = "ClearPendingNotifications"; public static final String PUSH_MESSAGE_TYPE = "Push"; public static final String PUSH_ID_TOUCHED_MESSAGE_TYPE = "PushIDTouched"; + public static final String LISTENING_PUSH_IDS_MESSAGE_TYPE = "ListeningPushIDs"; public static final String REMOVE_GROUP_MEMBER_MESSAGE_TYPE = "RemoveGroupMember"; private static final String PRODUCT_MESSAGE = @@ -90,7 +91,8 @@ private final MessageHandler addGroupMemberMessageHandler; private final MessageHandler clearPendingNotificationsMessageHandler; private final MessageHandler pushMessageHandler; - private final MessageHandler pushIdTouchedMessageHandler; +// private final MessageHandler pushIdTouchedMessageHandler; + private final MessageHandler listeningPushIDs; private final MessageHandler removeGroupMemberMessageHandler; public PushMessageService( @@ -227,29 +229,54 @@ } } }; - pushIdTouchedMessageHandler = +// pushIdTouchedMessageHandler = +// new AbstractMessageHandler( +// new MessageSelector( +// new And( +// new Equal( +// new Identifier(Message.MESSAGE_TYPE), +// new StringLiteral(PUSH_ID_TOUCHED_MESSAGE_TYPE)), +// new NotEqual( +// new Identifier(Message.SOURCE_NODE_ADDRESS), +// new StringLiteral(HOST_ADDRESS))))) { +// +// public void handle(final Message message) { +// if (message != null && message instanceof TextMessage) { +// if (LOGGER.isLoggable(Level.FINE)) { +// LOGGER.log(Level.FINE, "Handling message:\r\n\r\n" + message); +// } +// /* +// * message-body: +// * , +// */ +// StringTokenizer _tokens = new StringTokenizer(((TextMessage)message).getText(), ","); +// ((LocalPushGroupManager)pushGroupManager). +// touchPushID(_tokens.nextToken(), Long.valueOf(_tokens.nextToken())); +// } +// } +// }; + listeningPushIDs = new AbstractMessageHandler( new MessageSelector( new And( new Equal( new Identifier(Message.MESSAGE_TYPE), - new StringLiteral(PUSH_ID_TOUCHED_MESSAGE_TYPE)), + new StringLiteral(LISTENING_PUSH_IDS_MESSAGE_TYPE)), new NotEqual( new Identifier(Message.SOURCE_NODE_ADDRESS), new StringLiteral(HOST_ADDRESS))))) { public void handle(final Message message) { - if (message != null && message instanceof TextMessage) { + if (message != null && message instanceof ObjectMessage) { if (LOGGER.isLoggable(Level.FINE)) { LOGGER.log(Level.FINE, "Handling message:\r\n\r\n" + message); } /* * message-body: - * , + * Map */ - StringTokenizer _tokens = new StringTokenizer(((TextMessage)message).getText(), ","); ((LocalPushGroupManager)pushGroupManager). - touchPushID(_tokens.nextToken(), Long.valueOf(_tokens.nextToken())); + listeningPushIDs((Map)((ObjectMessage)message).getObject()); } } }; @@ -288,7 +315,7 @@ addGroupMemberMessageHandler.getMessageSelector().getExpression(), clearPendingNotificationsMessageHandler.getMessageSelector().getExpression(), pushMessageHandler.getMessageSelector().getExpression(), - pushIdTouchedMessageHandler.getMessageSelector().getExpression(), +// pushIdTouchedMessageHandler.getMessageSelector().getExpression(), removeGroupMemberMessageHandler.getMessageSelector().getExpression()))); _messageServiceClient.addMessageHandlers( announcementMessageHandler, @@ -296,7 +323,8 @@ addGroupMemberMessageHandler, clearPendingNotificationsMessageHandler, pushMessageHandler, - pushIdTouchedMessageHandler, +// pushIdTouchedMessageHandler, + listeningPushIDs, removeGroupMemberMessageHandler); Properties _messageProperties = new Properties(); _messageServiceClient.publishNow(PRODUCT_MESSAGE, _messageProperties, ANNOUNCEMENT_MESSAGE_TYPE); @@ -306,7 +334,8 @@ throws MessageServiceException { getMessageServiceClient().removeMessageHandlers( removeGroupMemberMessageHandler, - pushIdTouchedMessageHandler, + listeningPushIDs, +// pushIdTouchedMessageHandler, pushMessageHandler, clearPendingNotificationsMessageHandler, addGroupMemberMessageHandler, Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/NoopPushGroupManager.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/NoopPushGroupManager.java (revision 28010) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/NoopPushGroupManager.java (revision ) @@ -60,7 +60,10 @@ public void pruneParkedIDs(String notifyBackURI, List listenedPushIds) { } - + + public void recordListen(List pushIdList, int sequenceNumber) { + } + public void removeMember(String groupName, String pushId) { } Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/samples/eps/poll/src/main/java/com/icesoft/faces/demo/poll/Poll.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/samples/eps/poll/src/main/java/com/icesoft/faces/demo/poll/Poll.java (revision 27269) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/samples/eps/poll/src/main/java/com/icesoft/faces/demo/poll/Poll.java (revision ) @@ -11,6 +11,7 @@ import javax.faces.event.ActionEvent; import org.icefaces.application.PortableRenderer; +import org.icefaces.application.PushMessage; public class Poll { private static final Logger LOGGER = Logger.getLogger(Poll.class.getName()); @@ -138,7 +139,7 @@ } if (_modified && broadcast) { Participant.getInstance().voteAgainst(id); - portableRenderer.render("poll"); + portableRenderer.render("poll", new PushMessage("Poll Update", "An against vote has been casted.")); if (pollMessageService != null) { pollMessageService.publishNow( id + "\r\n" + participantId, @@ -164,7 +165,7 @@ } if (_modified && broadcast) { Participant.getInstance().voteFor(id); - portableRenderer.render("poll"); + portableRenderer.render("poll", new PushMessage("Poll Update", "A for vote has been casted.")); if (pollMessageService != null) { pollMessageService.publishNow( id + "\r\n" + participantId, Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/samples/eps/poll/src/main/webapp/WEB-INF/web.xml =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/samples/eps/poll/src/main/webapp/WEB-INF/web.xml (revision 27320) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/samples/eps/poll/src/main/webapp/WEB-INF/web.xml (revision ) @@ -12,10 +12,50 @@ - + + org.icepush.pushGroupManager + com.icesoft.push.DynamicPushGroupManager + + + + smtp.host + mail.icesoft.com + + + + smtp.port + 465 + + + + smtp.from + Jack@ICEsoft.com + + + + smtp.user + jvanooststroom + + + + smtp.password + + + + + smtp.security + TLS + + + + org.jboss.jbossfaces.WAR_BUNDLES_JSF_IMPL + true + + + com.icesoft.faces.demo.poll.debugMode - false + true @@ -43,9 +83,11 @@ Production + Faces Servlet Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/LocalNotificationBroadcaster.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/LocalNotificationBroadcaster.java (revision 29782) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/LocalNotificationBroadcaster.java (revision ) @@ -23,8 +23,12 @@ import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; public class LocalNotificationBroadcaster implements NotificationBroadcaster { + private static final Logger LOGGER = Logger.getLogger(LocalNotificationBroadcaster.class.getName()); + private static final String[] STRINGS = new String[0]; private ArrayList receivers = new ArrayList(); @@ -32,83 +36,17 @@ receivers.add(receiver); } - public String[] broadcast(String[] notifiedPushIds) { - final int size = receivers.size(); - final Semaphore semaphore = new Semaphore(size, true); - final HashSet confirmedPushIds = new HashSet(); - - try { - semaphore.acquire(size); - Confirmation confirmation = new Confirmation() { - public void handlingConfirmed(String[] ids) { - confirmedPushIds.addAll(Arrays.asList(ids)); - semaphore.release(); - } - }; - for (Receiver receiver : receivers) { - receiver.receive(notifiedPushIds, confirmation); - } - - //block until all notified parties confirm the sending of the pushID notifications - semaphore.acquire(size); - } catch (InterruptedException e) { - return STRINGS; - } - - return confirmedPushIds.toArray(STRINGS); - } - - private enum ConfirmationStatus { FALSE, TRUE, DELAYED } - private final Map receiverConfirmedMap = new HashMap(); - private final ReentrantLock receiverConfirmedMapLock = new ReentrantLock(); - - public void broadcast(final String[] notifiedPushIds, final NotifiedPushIDsHandler notifiedPushIDsHandler) { - final int size = receivers.size(); - final Map receiverConfirmationMap = new HashMap(); + public void broadcast(final String[] notifiedPushIds) { + LOGGER.log( + Level.INFO, + "LocalNotificationBroadcaster." + + "broadcast(" + + "notifiedPushIds: '" + Arrays.asList(notifiedPushIds) + "')"); for (final Receiver receiver : receivers) { - receiverConfirmedMapLock.lock(); - try { - if (!receiverConfirmedMap.containsKey(receiver) || - receiverConfirmedMap.get(receiver) == ConfirmationStatus.TRUE) { - - receiverConfirmedMap.put(receiver, ConfirmationStatus.FALSE); - receiverConfirmationMap.put( - receiver, - new Confirmation() { - public void handlingConfirmed(final String[] pushIDs) { - receiverConfirmedMapLock.lock(); - try { - notifiedPushIDsHandler.handle(pushIDs); - if (receiverConfirmedMap.containsKey(receiver)) { - receiverConfirmedMap.put(receiver, ConfirmationStatus.TRUE); + receiver.receive(notifiedPushIds); - } + } - } finally { - receiverConfirmedMapLock.unlock(); - } + } - } - }); - } else if ( - receiverConfirmedMap.containsKey(receiver) && - receiverConfirmedMap.get(receiver) == ConfirmationStatus.FALSE) { - receiverConfirmedMap.put(receiver, ConfirmationStatus.DELAYED); - } - } finally { - receiverConfirmedMapLock.unlock(); - } - } - for (Receiver receiver : receivers) { - receiverConfirmedMapLock.lock(); - try { - if (receiverConfirmedMap.get(receiver) == ConfirmationStatus.FALSE) { - receiver.receive(notifiedPushIds, receiverConfirmationMap.remove(receiver)); - } - } finally { - receiverConfirmedMapLock.unlock(); - } - } - } - public void deleteReceiver(Receiver observer) { receivers.remove(observer); } Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/LocalPushGroupManager.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/LocalPushGroupManager.java (revision 29782) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/LocalPushGroupManager.java (revision ) @@ -56,11 +56,13 @@ private final long groupTimeout; private final long pushIdTimeout; private final ServletContext context; + private final Timer timeoutTimer = new Timer("Timeout timer", true); private long lastTouchScan = System.currentTimeMillis(); private long lastExpiryScan = System.currentTimeMillis(); public LocalPushGroupManager(final ServletContext servletContext) { + LOGGER.log(Level.INFO, "org.icepush.LocalPushGroupManager"); this.context = servletContext; Configuration configuration = new ServletContextConfiguration("org.icepush", servletContext); this.groupTimeout = configuration.getAttributeAsLong("groupTimeout", 2 * 60 * 1000); @@ -71,7 +73,7 @@ this.timer.schedule(queueConsumer, 0); } - public void scan(String[] confirmedPushIDs) { + public void scan(final String[] confirmedPushIDs) { Set pushIDs = new HashSet(); long now = System.currentTimeMillis(); //accumulate pushIDs @@ -83,10 +85,6 @@ group.touchIfMatching(pushIDs); group.discardIfExpired(); } - for (PushID pushID : new ArrayList(pushIDMap.values())) { - pushID.touchIfMatching(pushIDs); - pushID.discardIfExpired(); - } } finally { lastTouchScan = now; lastExpiryScan = now; @@ -138,6 +136,11 @@ } public void push(final String groupName) { + LOGGER.log( + Level.INFO, + "LocalPushGroupManager." + + "push(" + + "groupName: '" + groupName + "')"); if (!queue.offer(new Notification(groupName))) { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.log(Level.INFO, "Notification for group '" + groupName + "' was dropped, queue maximum size reached."); @@ -146,10 +149,35 @@ } public void push(final String groupName, final PushConfiguration config) { + LOGGER.log( + Level.INFO, + "LocalPushGroupManager." + + "push(" + + "groupName: '" + groupName + "', " + + "pushConfiguration: '" + config + "')"); //add this notification to a blocking queue queue.add(new OutOfBandNotification(groupName, config)); } + public void recordListen(final List pushIDList, final int sequenceNumber) { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log( + Level.INFO, + "LocalPushGroupManager." + + "recordListen(" + + "pushIDList: '" + pushIDList + "', " + + "sequenceNumber: '" + sequenceNumber + "')"); + } + for (final String pushIDString : pushIDList) { + PushID pushID = pushIDMap.get(pushIDString); + if (pushID != null) { + LOGGER.log( + Level.INFO, "Record Listen :: PUSHID: " + pushIDString + ", Sequence Number: " + sequenceNumber); + pushID.setSequenceNumber(sequenceNumber); + } + } + } + public void removeMember(final String groupName, final String pushId) { Group group = groupMap.get(groupName); if (group != null) { @@ -190,6 +218,42 @@ timer.cancel(); } + public void cancelConfirmationTimeout(final String[] pushIDs) { + for (final String pushID : pushIDs) { + pushIDMap.get(pushID).cancelConfirmationTimeout(); + } + } + + public void cancelExpiryTimeout(final String[] pushIDs) { + for (final String pushID : pushIDs) { + pushIDMap.get(pushID).cancelExpiryTimeout(); + } + } + + public void startConfirmationTimeout(final String[] pushIDs) { + for (final String pushID : pushIDs) { + pushIDMap.get(pushID).startConfirmationTimeout(); + } + } + + public void startExpiryTimeout(final String[] pushIDs) { + for (final String pushID : pushIDs) { + pushIDMap.get(pushID).startExpiryTimeout(); + } + } + + protected Map getPushIDMap() { + return Collections.unmodifiableMap(pushIDMap); + } + + protected HashMap getPushIDSequenceNumberMap() { + HashMap pushIDSequenceNumberMap = new HashMap(); + for (final PushID pushID : pushIDMap.values()) { + pushIDSequenceNumberMap.put(pushID.id, pushID.sequenceNumber); + } + return pushIDSequenceNumberMap; + } + private void scanForExpiry() { long now = System.currentTimeMillis(); //avoid to scan/touch the groups on each notification @@ -198,22 +262,23 @@ for (Group group : new ArrayList(groupMap.values())) { group.discardIfExpired(); } - for (PushID pushID : new ArrayList(pushIDMap.values())) { - pushID.discardIfExpired(); - } } finally { lastExpiryScan = now; } } } - public void touchPushID(final String id, final Long timestamp) { - PushID pushID = pushIDMap.get(id); - if (pushID != null) { - pushID.touch(timestamp); + public void listeningPushIDs(final Map pushIDSequenceNumberMap) { + // Do nothing. - } + } - } +// public void touchPushID(final String id, final Long timestamp) { +// PushID pushID = pushIDMap.get(id); +// if (pushID != null) { +// pushID.touch(timestamp); +// } +// } + private OutOfBandNotifier getOutOfBandNotifier() { Object attribute = context.getAttribute(OutOfBandNotifier.class.getName()); return attribute == null ? NOOPOutOfBandNotifier : (OutOfBandNotifier) attribute; @@ -291,89 +356,161 @@ } } - private class PushID { + protected class PushID { private final String id; private final Set groups = new HashSet(); private long lastAccess = System.currentTimeMillis(); + private int sequenceNumber; + private TimerTask confirmationTimeout; + private TimerTask expiryTimeout; + private PushConfiguration pushConfiguration; private PushID(String id, String group) { this.id = id; addToGroup(group); } - private void addToGroup(String group) { - groups.add(group); - } + public int getSequenceNumber() { + return sequenceNumber; - private void removeFromGroup(String group) { - groups.remove(group); - if (groups.isEmpty()) { - if (LOGGER.isLoggable(Level.FINEST)) { - LOGGER.log(Level.FINEST, "Disposed '" + id + "' pushId since it no longer belongs to any group."); - } + } - pushIDMap.remove(id); - } - } - private void touch() { - touch(System.currentTimeMillis()); + public void addToGroup(String group) { + groups.add(group); } - private void touch(final Long timestamp) { - lastAccess = timestamp; + public void cancelConfirmationTimeout() { + LOGGER.log(Level.INFO, "Cancelling confirmation timeout."); + if (confirmationTimeout != null) { + confirmationTimeout.cancel(); + confirmationTimeout = null; + pushConfiguration = null; - } + } + } - private void touchIfMatching(Set pushIDs) { - if (pushIDs.contains(id)) { - touch(); - pushIDTouched(id, lastAccess); + public void cancelExpiryTimeout() { + LOGGER.log(Level.INFO, "Cancelling expiry timeout."); + if (expiryTimeout != null) { + expiryTimeout.cancel(); + expiryTimeout = null; } } - private void discardIfExpired() { - //expire pushId - if (!parkedPushIDs.containsKey(id) && lastAccess + pushIdTimeout < System.currentTimeMillis()) { + public void discard() { - if (LOGGER.isLoggable(Level.FINEST)) { + if (LOGGER.isLoggable(Level.FINEST)) { - LOGGER.log(Level.FINEST, "'" + id + "' pushId expired."); + LOGGER.log(Level.FINEST, "'" + id + "' pushId discarded."); - } - pushIDMap.remove(id); - pendingNotifications.remove(id); - for (String groupName : groups) { - Group group = groupMap.get(groupName); - if (group != null) { - group.removePushID(id); - } - } - } + } + pushIDMap.remove(id); + pendingNotifications.remove(id); + for (String groupName : groups) { + Group group = groupMap.get(groupName); + if (group != null) { + group.removePushID(id); + } + } + } + +// public void discardIfExpired() { +// //expire pushId +// if (!parkedPushIDs.containsKey(id) && lastAccess + pushIdTimeout < System.currentTimeMillis()) { +// discard(); +// } +// } + + public void removeFromGroup(String group) { + groups.remove(group); + if (groups.isEmpty()) { + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.log(Level.FINEST, "Disposed '" + id + "' pushId since it no longer belongs to any group."); - } + } + pushIDMap.remove(id); - } + } + } + public void setSequenceNumber(final int sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + public void startConfirmationTimeout() { + if (pushConfiguration != null) { + LOGGER.log(Level.INFO, "Starting confirmation timeout."); + timeoutTimer.schedule( + confirmationTimeout = + new TimerTask() { + @Override + public void run() { + LOGGER.log(Level.INFO, "Executing confirmation timeout."); + String notifyBackURI = parkedPushIDs.get(id); + if (notifyBackURI != null) { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log(Level.INFO, "Cloud Push dispatched for " + id); + } + getOutOfBandNotifier().broadcast( + (PushNotification)pushConfiguration, + new String[] { + notifyBackURI + }); + } + cancelConfirmationTimeout(); + } + }, + 100); + } + } + + public void startExpiryTimeout() { + LOGGER.log(Level.INFO, "Starting expiry timeout."); + timeoutTimer.schedule( + expiryTimeout = + new TimerTask() { + @Override + public void run() { + LOGGER.log(Level.INFO, "Executing expiry timeout."); + LOGGER.info("PushID expired: " + id); + discard(); + cancelExpiryTimeout(); + } + }, + pushIdTimeout); + } + +// public void touch() { +// touch(System.currentTimeMillis()); +// } +// +// public void touch(final Long timestamp) { +// lastAccess = timestamp; +// } +// +// public void touchIfMatching(Set pushIDs) { +// if (pushIDs.contains(id)) { +// touch(); +// pushIDTouched(id, lastAccess); +// } +// } + } + private class Notification implements Runnable { - private final String groupName; + protected final String groupName; public Notification(String groupName) { this.groupName = groupName; } public void run() { + LOGGER.log( + Level.INFO, + "LocalPushGroupManager.Notification." + + "run()"); try { Group group = groupMap.get(groupName); if (group != null) { - if (LOGGER.isLoggable(Level.FINEST)) { - LOGGER.log(Level.FINEST, "Push notification triggered for '" + groupName + "' group."); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log(Level.INFO, "Push notification triggered for '" + groupName + "' group."); } String[] pushIDs = group.getPushIDs(); pendingNotifications.addAll(Arrays.asList(pushIDs)); - outboundNotifier.broadcast( - pushIDs, - new NotifiedPushIDsHandler() { - public void handle(final String[] notifiedPushIDs) { - for (int i = 0; i < notifiedPushIDs.length; i++) { - parkedPushIDs.remove(notifiedPushIDs[i]); - } - scan(notifiedPushIDs); - } - }); + outboundNotifier.broadcast(pushIDs); pushed(groupName); } } finally { @@ -383,63 +520,25 @@ } private class OutOfBandNotification extends Notification { - private final String groupName; private final PushConfiguration config; public OutOfBandNotification(String groupName, PushConfiguration config) { super(groupName); - this.groupName = groupName; this.config = config; } public void run() { - try { - try { - Group group = groupMap.get(groupName); + Group group = groupMap.get(groupName); + String[] pushIDs = STRINGS; - if (group != null) { + if (group != null) { - if (LOGGER.isLoggable(Level.FINEST)) { - LOGGER.log(Level.FINEST, "Push notification triggered for '" + groupName + "' group."); - } - String[] pushIDs = group.getPushIDs(); - pendingNotifications.addAll(Arrays.asList(pushIDs)); - - String[] notified = outboundNotifier.broadcast(pushIDs); - for (int i = 0; i < notified.length; i++) { - parkedPushIDs.remove(notified[i]); - } - scan(notified); - pushed(groupName); - } - } finally { - scanForExpiry(); - } - Group group = groupMap.get(groupName); - String[] pushIDs = new String[0]; - if (null != group) { - pushIDs = group.getPushIDs(); - } + pushIDs = group.getPushIDs(); + } - HashSet uris = new HashSet(); - for (int i = 0; i < pushIDs.length; i++) { - String pushID = pushIDs[i]; - String uri = parkedPushIDs.get(pushID); - if (uri != null) { - uris.add(uri); + for (final String pushID : pushIDs) { + pushIDMap.get(pushID).pushConfiguration = config; - } + } + super.run(); - } + } - - if (!uris.isEmpty()) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.log(Level.FINE, "Cloud Push dispatched for " + parkedPushIDs); - } + } - getOutOfBandNotifier().broadcast( - (PushNotification) config, - (String[]) uris.toArray(STRINGS)); - } - } finally { - scanForExpiry(); - } - } - } private class QueueConsumerTask extends TimerTask { private boolean running = true; Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/NotificationBroadcaster.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/NotificationBroadcaster.java (revision 29782) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/NotificationBroadcaster.java (revision ) @@ -23,15 +23,9 @@ void deleteReceiver(Receiver observer); - String[] broadcast(String[] notifiedPushIds); + void broadcast(String[] notifiedPushIds); - void broadcast(String[] notifiedPushIDs, NotifiedPushIDsHandler notifiedPushIDsHandler); - - public interface Confirmation { - void handlingConfirmed(String[] pushIds); - } - public interface Receiver { - void receive(String[] pushIds, Confirmation confirmation); + void receive(String[] pushIds); } } Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core-ee/src/main/java/com/icesoft/icepush/MainServlet.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core-ee/src/main/java/com/icesoft/icepush/MainServlet.java (revision 30770) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core-ee/src/main/java/com/icesoft/icepush/MainServlet.java (revision ) @@ -10,6 +10,7 @@ import javax.servlet.ServletContext; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.logging.Logger; public class MainServlet extends org.icepush.servlet.MainServlet { @@ -19,8 +20,25 @@ this(context, true); } - public MainServlet(ServletContext servletContext, boolean terminateBlockingConnectionOnShutdown) { - super(servletContext, terminateBlockingConnectionOnShutdown, false); + public MainServlet(final ServletContext servletContext, + final boolean terminateBlockingConnectionOnShutdown) { + + this(servletContext, terminateBlockingConnectionOnShutdown, false); + } + + public MainServlet(final ServletContext servletContext, + final boolean terminateBlockingConnectionOnShutdown, + final boolean printProductInfo) { + + this(servletContext, terminateBlockingConnectionOnShutdown, printProductInfo, null); + } + + public MainServlet(final ServletContext servletContext, + final boolean terminateBlockingConnectionOnShutdown, + final boolean printProductInfo, + final ScheduledThreadPoolExecutor executor) { + + super(servletContext, terminateBlockingConnectionOnShutdown, printProductInfo, executor); log.info(new ProductInfo().toString()); } Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/BlockingConnectionServer.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/BlockingConnectionServer.java (revision 29923) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/core/src/main/java/org/icepush/BlockingConnectionServer.java (revision ) @@ -54,10 +54,6 @@ public void run() { } }; - private static final NotificationBroadcaster.Confirmation NOOPConfirmation = new NotificationBroadcaster.Confirmation() { - public void handlingConfirmed(String[] pushIds) { - } - }; private final BlockingQueue pendingRequest = new LinkedBlockingQueue(1); private final Slot heartbeatInterval; @@ -67,7 +63,6 @@ private ConcurrentLinkedQueue notifiedPushIDs = new ConcurrentLinkedQueue(); private List participatingPushIDs = Collections.emptyList(); private TimerTask confirmationFailed = NOOPTask; - private NotificationBroadcaster.Confirmation confirmation = NOOPConfirmation; private Timer monitoringScheduler; private String lastWindow = ""; @@ -84,7 +79,7 @@ this.connectionRecreationTimeout = configuration.getAttributeAsLong("connectionRecreationTimeout", 5000); this.monitoringScheduler = monitoringScheduler; //add monitor - monitoringScheduler.scheduleAtFixedRate(this, 0, 1000); + this.monitoringScheduler.scheduleAtFixedRate(this, 0, 1000); this.pushGroupManager.addNotificationReceiver(this); //define blocking server @@ -108,11 +103,20 @@ } private boolean sendNotifications(String[] ids) { + log.log( + Level.INFO, + "BlockingConnectionServer." + + "sendNotifications(" + + "ids: '" + Arrays.asList(ids) + "')"); //stop sending notifications if pushID are not used anymore by the browser List pushIDs = new ArrayList(Arrays.asList(ids)); pushIDs.retainAll(participatingPushIDs); boolean anyNotifications = !pushIDs.isEmpty(); - + log.log( + Level.INFO, + "BlockingConnectionServer." + + "sendNotifications(" + + "ids: '" + Arrays.asList(ids) + "') :: anyNotifications: '" + anyNotifications + "'"); if (anyNotifications) { notifiedPushIDs.addAll(pushIDs); resetTimeout(); @@ -127,8 +131,12 @@ } private synchronized void respondIfNotificationsAvailable() { + log.log( + Level.INFO, + "BlockingConnectionServer." + + "respondIfNotificationsAvailable()"); if (!notifiedPushIDs.isEmpty()) { - //save notifications, maybe they will need to be resent when blocking connection switches to another window + //save notifications, maybe they will need to be resent when blocking connection switches to another window lastNotifications = (String[]) new HashSet(notifiedPushIDs).toArray(STRINGS); respondIfPendingRequest(new NotificationHandler(lastNotifications) { public void writeTo(Writer writer) throws IOException { @@ -137,6 +145,8 @@ if (log.isLoggable(Level.FINE)) { log.fine("Sending notifications for " + notifiedPushIDs + "."); } + pushGroupManager.startConfirmationTimeout(lastNotifications); + pushGroupManager.startExpiryTimeout(lastNotifications); pushGroupManager.clearPendingNotifications(participatingPushIDs); notifiedPushIDs.removeAll(Arrays.asList(lastNotifications)); } @@ -149,6 +159,10 @@ } private void respondIfPendingRequest(ResponseHandler handler) { + log.log( + Level.INFO, + "BlockingConnectionServer." + + "respondIfNotificationsAvailable()"); Request previousRequest = (Request) pendingRequest.poll(); if (previousRequest != null) { try { @@ -207,11 +221,9 @@ } } - public void receive(String[] pushIds, final NotificationBroadcaster.Confirmation confirmation) { - this.confirmation = confirmation; + public void receive(String[] pushIds) { this.confirmationFailed = new TimerTask() { public void run() { - confirmation.handlingConfirmed(STRINGS); if (notifyBackURI != null && !"".equals(notifyBackURI)) { pushGroupManager.park(participatingPushIDs.toArray(STRINGS), notifyBackURI); } @@ -236,11 +248,13 @@ } public void service(final Request request) throws Exception { + log.log(Level.INFO, "ice.push.sequence: " + request.getHeaderAsInteger("ice.push.sequence")); resetTimeout(); adjustConnectionRecreationTimeout(request); respondIfPendingRequest(CloseResponseDup); - + log.log(Level.INFO, "Record Listen..."); + pushGroupManager.recordListen(participatingPushIDs, request.getHeaderAsInteger("ice.push.sequence")); //resend notifications if the window owning the blocking connection has changed String currentWindow = request.getHeader("ice.push.window"); currentWindow = currentWindow == null ? "" : currentWindow; @@ -251,10 +265,10 @@ try { participatingPushIDs = Arrays.asList(request.getParameterAsStrings("ice.pushid")); notifyBackURI = request.getHeader("ice.notifyBack"); - confirmationFailed.cancel(); - confirmation.handlingConfirmed(participatingPushIDs.toArray(STRINGS)); - + pushGroupManager.scan(participatingPushIDs.toArray(STRINGS)); + pushGroupManager.cancelConfirmationTimeout(participatingPushIDs.toArray(STRINGS)); + pushGroupManager.cancelExpiryTimeout(participatingPushIDs.toArray(STRINGS)); if (null != notifyBackURI) { pushGroupManager.pruneParkedIDs(notifyBackURI, participatingPushIDs); Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/eps/src/main/java/com/icesoft/push/RemotePushGroupManager.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/eps/src/main/java/com/icesoft/push/RemotePushGroupManager.java (revision 30573) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/eps/src/main/java/com/icesoft/push/RemotePushGroupManager.java (revision ) @@ -101,6 +101,16 @@ // throw new UnsupportedOperationException(); } + public void cancelConfirmationTimeout(final String[] pushIDs) + throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + + public void cancelExpiryTimeout(final String[] pushIDs) + throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + public void clearPendingNotifications(final List pushIdList) { MessagePayload _messagePayload = new MessagePayload(); _messagePayload.setPushIdList(pushIdList); @@ -130,7 +140,8 @@ throw new UnsupportedOperationException(); } - public void pruneParkedIDs(String notifyBackURI, List listenedPushIds) { + public void pruneParkedIDs(String notifyBackURI, List listenedPushIds) + throws UnsupportedOperationException { throw new UnsupportedOperationException(); } @@ -153,6 +164,16 @@ "Push"); } + public void scan(final String[] confirmedPushIDs) + throws UnsupportedOperationException { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void recordListen(List pushIdList, int sequenceNumber) + throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + public void removeMember(final String groupName, final String pushId) { MessagePayload _messagePayload = new MessagePayload(); _messagePayload.setGroupName(getNamespacedGroupName(groupName)); @@ -170,6 +191,16 @@ executor.shutdownNow(); } + public void startConfirmationTimeout(final String[] pushIDs) + throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + + public void startExpiryTimeout(final String[] pushIDs) + throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + private String getNamespacedGroupName(final String groupName) { if (!groupName.startsWith("/")) { return applicationNameSpace + "/" + groupName; Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/samples/eps/poll/src/main/java/com/icesoft/faces/demo/poll/PollCollection.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/samples/eps/poll/src/main/java/com/icesoft/faces/demo/poll/PollCollection.java (revision 30902) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/samples/eps/poll/src/main/java/com/icesoft/faces/demo/poll/PollCollection.java (revision ) @@ -25,6 +25,7 @@ import javax.servlet.ServletContext; import org.icefaces.application.PortableRenderer; +import org.icefaces.application.PushMessage; import org.icefaces.application.PushRenderer; import org.icepush.servlet.ServletContextConfiguration; @@ -68,7 +69,9 @@ } } if (_modified) { - portableRenderer.render("poll"); + portableRenderer.render( + "poll", + new PushMessage("Poll Update", "One or more polls have been removed.")); } } }; @@ -155,7 +158,7 @@ void addPoll(final Poll poll, final boolean broadcast) { pollMap.put(poll.getId(), poll); - portableRenderer.render("poll"); + portableRenderer.render("poll", new PushMessage("Poll Update", "A new poll has been added.")); if (broadcast) { if (pollMessageService != null) { pollMessageService.publishNow( Index: ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/eps/src/main/java/com/icesoft/push/DynamicPushGroupManager.java =================================================================== --- ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/eps/src/main/java/com/icesoft/push/DynamicPushGroupManager.java (revision 30574) +++ ../../Repositories/repo/icefaces-ee3/trunk/icefaces-ee/icepush-ee/eps/src/main/java/com/icesoft/push/DynamicPushGroupManager.java (revision ) @@ -86,6 +86,26 @@ } } + public void cancelConfirmationTimeout(final String[] pushIDs) { + lock.lock(); + try { + checkManager(); + currentPushGroupManager.cancelConfirmationTimeout(pushIDs); + } finally { + lock.unlock(); + } + } + + public void cancelExpiryTimeout(final String[] pushIDs) { + lock.lock(); + try { + checkManager(); + currentPushGroupManager.cancelExpiryTimeout(pushIDs); + } finally { + lock.unlock(); + } + } + public void clearPendingNotifications(final List pushIdList) { lock.lock(); try { @@ -176,6 +196,16 @@ } } + public void recordListen(final List pushIdList, final int sequenceNumber) { + lock.lock(); + try { + checkManager(); + currentPushGroupManager.recordListen(pushIdList, sequenceNumber); + } finally { + lock.unlock(); + } + } + public void removeMember(final String groupName, final String pushId) { lock.lock(); try { @@ -186,6 +216,16 @@ } } + public void scan(final String[] confirmedPushIDs) { + lock.lock(); + try { + checkManager(); + currentPushGroupManager.scan(confirmedPushIDs); + } finally { + lock.unlock(); + } + } + public void shutdown() { lock.lock(); try { @@ -196,6 +236,26 @@ } } + public void startConfirmationTimeout(final String[] pushIDs) { + lock.lock(); + try { + checkManager(); + currentPushGroupManager.startConfirmationTimeout(pushIDs); + } finally { + lock.unlock(); + } + } + + public void startExpiryTimeout(final String[] pushIDs) { + lock.lock(); + try { + checkManager(); + currentPushGroupManager.startExpiryTimeout(pushIDs); + } finally { + lock.unlock(); + } + } + private void checkManager() { lock.lock(); try {