|
45 | 45 | import org.apache.cloudstack.framework.config.dao.ConfigurationDao; |
46 | 46 | import org.apache.cloudstack.framework.jobs.AsyncJob; |
47 | 47 | import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; |
| 48 | +import org.apache.cloudstack.framework.messagebus.MessageBus; |
| 49 | +import org.apache.cloudstack.framework.messagebus.MessageSubscriber; |
48 | 50 | import org.apache.cloudstack.managed.context.ManagedContextRunnable; |
49 | 51 | import org.apache.cloudstack.outofbandmanagement.dao.OutOfBandManagementDao; |
50 | 52 | import org.apache.cloudstack.utils.identity.ManagementServerNode; |
|
83 | 85 | import com.cloud.dc.dao.ClusterDao; |
84 | 86 | import com.cloud.dc.dao.DataCenterDao; |
85 | 87 | import com.cloud.dc.dao.HostPodDao; |
| 88 | +import com.cloud.event.EventTypes; |
86 | 89 | import com.cloud.exception.AgentUnavailableException; |
87 | 90 | import com.cloud.exception.ConnectionException; |
88 | 91 | import com.cloud.exception.OperationTimedoutException; |
@@ -187,6 +190,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl |
187 | 190 | ResourceManager _resourceMgr; |
188 | 191 | @Inject |
189 | 192 | ManagementServiceConfiguration mgmtServiceConf; |
| 193 | + @Inject |
| 194 | + MessageBus messageBus; |
190 | 195 |
|
191 | 196 | protected final ConfigKey<Integer> Workers = new ConfigKey<Integer>("Advanced", Integer.class, "workers", "5", |
192 | 197 | "Number of worker threads handling remote agent connections.", false); |
@@ -237,6 +242,8 @@ public boolean configure(final String name, final Map<String, Object> params) th |
237 | 242 |
|
238 | 243 | _monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor")); |
239 | 244 |
|
| 245 | + initMessageBusListener(); |
| 246 | + |
240 | 247 | return true; |
241 | 248 | } |
242 | 249 |
|
@@ -1790,4 +1797,57 @@ public int getTimeout() { |
1790 | 1797 |
|
1791 | 1798 | } |
1792 | 1799 |
|
| 1800 | + protected Map<Long, List<Long>> getHostsPerZone() { |
| 1801 | + List<HostVO> allHosts = _resourceMgr.listAllHostsInAllZonesByType(Host.Type.Routing); |
| 1802 | + if (allHosts == null) { |
| 1803 | + return null; |
| 1804 | + } |
| 1805 | + Map<Long, List<Long>> hostsByZone = new HashMap<Long, List<Long>>(); |
| 1806 | + for (HostVO host : allHosts) { |
| 1807 | + if (host.getHypervisorType() == HypervisorType.KVM || host.getHypervisorType() == HypervisorType.LXC) { |
| 1808 | + Long zoneId = host.getDataCenterId(); |
| 1809 | + List<Long> hostIds = hostsByZone.get(zoneId); |
| 1810 | + if (hostIds == null) { |
| 1811 | + hostIds = new ArrayList<Long>(); |
| 1812 | + } |
| 1813 | + hostIds.add(host.getId()); |
| 1814 | + hostsByZone.put(zoneId, hostIds); |
| 1815 | + } |
| 1816 | + } |
| 1817 | + return hostsByZone; |
| 1818 | + } |
| 1819 | + |
| 1820 | + private void sendCommandToAgents(Map<Long, List<Long>> hostsPerZone, Map<String, String> params) { |
| 1821 | + SetHostParamsCommand cmds = new SetHostParamsCommand(params); |
| 1822 | + for (Long zoneId : hostsPerZone.keySet()) { |
| 1823 | + List<Long> hostIds = hostsPerZone.get(zoneId); |
| 1824 | + for (Long hostId : hostIds) { |
| 1825 | + Answer answer = easySend(hostId, cmds); |
| 1826 | + if (answer == null || !answer.getResult()) { |
| 1827 | + s_logger.error("Error sending parameters to agent " + hostId); |
| 1828 | + } |
| 1829 | + } |
| 1830 | + } |
| 1831 | + } |
| 1832 | + |
| 1833 | + private void propagateChangeToAgents() { |
| 1834 | + s_logger.debug("Propagating changes on host parameters to the agents"); |
| 1835 | + Map<Long, List<Long>> hostsPerZone = getHostsPerZone(); |
| 1836 | + Map<String, String> params = new HashMap<String, String>(); |
| 1837 | + params.put("router.aggregation.command.each.timeout", _configDao.getValue("router.aggregation.command.each.timeout")); |
| 1838 | + sendCommandToAgents(hostsPerZone, params); |
| 1839 | + } |
| 1840 | + |
| 1841 | + private void initMessageBusListener() { |
| 1842 | + messageBus.subscribe(EventTypes.EVENT_CONFIGURATION_VALUE_EDIT, new MessageSubscriber() { |
| 1843 | + @Override |
| 1844 | + public void onPublishMessage(String serderAddress, String subject, Object args) { |
| 1845 | + String globalSettingUpdated = (String)args; |
| 1846 | + if (globalSettingUpdated.equals("router.aggregation.command.each.timeout")) { |
| 1847 | + propagateChangeToAgents(); |
| 1848 | + } |
| 1849 | + } |
| 1850 | + }); |
| 1851 | + } |
| 1852 | + |
1793 | 1853 | } |
0 commit comments