|
46 | 46 | import org.apache.cloudstack.framework.config.dao.ConfigurationDao; |
47 | 47 | import org.apache.cloudstack.framework.jobs.AsyncJob; |
48 | 48 | import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; |
| 49 | +import org.apache.cloudstack.framework.messagebus.MessageBus; |
| 50 | +import org.apache.cloudstack.framework.messagebus.MessageSubscriber; |
49 | 51 | import org.apache.cloudstack.managed.context.ManagedContextRunnable; |
50 | 52 | import org.apache.cloudstack.outofbandmanagement.dao.OutOfBandManagementDao; |
51 | 53 | import org.apache.cloudstack.utils.identity.ManagementServerNode; |
|
83 | 85 | import com.cloud.dc.dao.ClusterDao; |
84 | 86 | import com.cloud.dc.dao.DataCenterDao; |
85 | 87 | import com.cloud.dc.dao.HostPodDao; |
| 88 | +import com.cloud.event.EventTypes; |
86 | 89 | import com.cloud.exception.AgentUnavailableException; |
87 | 90 | import com.cloud.exception.ConnectionException; |
88 | 91 | import com.cloud.exception.OperationTimedoutException; |
@@ -189,6 +192,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl |
189 | 192 |
|
190 | 193 | @Inject |
191 | 194 | ManagementServiceConfiguration mgmtServiceConf; |
| 195 | + @Inject |
| 196 | + MessageBus messageBus; |
192 | 197 |
|
193 | 198 | protected final ConfigKey<Integer> Workers = new ConfigKey<Integer>("Advanced", Integer.class, "workers", "5", |
194 | 199 | "Number of worker threads handling remote agent connections.", false); |
@@ -244,6 +249,8 @@ public boolean configure(final String name, final Map<String, Object> params) th |
244 | 249 |
|
245 | 250 | _monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor")); |
246 | 251 |
|
| 252 | + initMessageBusListener(); |
| 253 | + |
247 | 254 | return true; |
248 | 255 | } |
249 | 256 |
|
@@ -1803,4 +1810,57 @@ public int getTimeout() { |
1803 | 1810 |
|
1804 | 1811 | } |
1805 | 1812 |
|
| 1813 | + protected Map<Long, List<Long>> getHostsPerZone() { |
| 1814 | + List<HostVO> allHosts = _resourceMgr.listAllHostsInAllZonesByType(Host.Type.Routing); |
| 1815 | + if (allHosts == null) { |
| 1816 | + return null; |
| 1817 | + } |
| 1818 | + Map<Long, List<Long>> hostsByZone = new HashMap<Long, List<Long>>(); |
| 1819 | + for (HostVO host : allHosts) { |
| 1820 | + if (host.getHypervisorType() == HypervisorType.KVM || host.getHypervisorType() == HypervisorType.LXC) { |
| 1821 | + Long zoneId = host.getDataCenterId(); |
| 1822 | + List<Long> hostIds = hostsByZone.get(zoneId); |
| 1823 | + if (hostIds == null) { |
| 1824 | + hostIds = new ArrayList<Long>(); |
| 1825 | + } |
| 1826 | + hostIds.add(host.getId()); |
| 1827 | + hostsByZone.put(zoneId, hostIds); |
| 1828 | + } |
| 1829 | + } |
| 1830 | + return hostsByZone; |
| 1831 | + } |
| 1832 | + |
| 1833 | + private void sendCommandToAgents(Map<Long, List<Long>> hostsPerZone, Map<String, String> params) { |
| 1834 | + SetHostParamsCommand cmds = new SetHostParamsCommand(params); |
| 1835 | + for (Long zoneId : hostsPerZone.keySet()) { |
| 1836 | + List<Long> hostIds = hostsPerZone.get(zoneId); |
| 1837 | + for (Long hostId : hostIds) { |
| 1838 | + Answer answer = easySend(hostId, cmds); |
| 1839 | + if (answer == null || !answer.getResult()) { |
| 1840 | + s_logger.error("Error sending parameters to agent " + hostId); |
| 1841 | + } |
| 1842 | + } |
| 1843 | + } |
| 1844 | + } |
| 1845 | + |
| 1846 | + private void propagateChangeToAgents() { |
| 1847 | + s_logger.debug("Propagating changes on host parameters to the agents"); |
| 1848 | + Map<Long, List<Long>> hostsPerZone = getHostsPerZone(); |
| 1849 | + Map<String, String> params = new HashMap<String, String>(); |
| 1850 | + params.put("router.aggregation.command.each.timeout", _configDao.getValue("router.aggregation.command.each.timeout")); |
| 1851 | + sendCommandToAgents(hostsPerZone, params); |
| 1852 | + } |
| 1853 | + |
| 1854 | + private void initMessageBusListener() { |
| 1855 | + messageBus.subscribe(EventTypes.EVENT_CONFIGURATION_VALUE_EDIT, new MessageSubscriber() { |
| 1856 | + @Override |
| 1857 | + public void onPublishMessage(String serderAddress, String subject, Object args) { |
| 1858 | + String globalSettingUpdated = (String)args; |
| 1859 | + if (globalSettingUpdated.equals("router.aggregation.command.each.timeout")) { |
| 1860 | + propagateChangeToAgents(); |
| 1861 | + } |
| 1862 | + } |
| 1863 | + }); |
| 1864 | + } |
| 1865 | + |
1806 | 1866 | } |
0 commit comments