OffloadingTopicManagerImpl.java 13.1 KB
/*
 * Decompiled with CFR 0_118.
 * 
 * Could not load the following classes:
 *  org.apache.commons.lang.StringUtils
 *  org.apache.felix.scr.annotations.Component
 *  org.apache.felix.scr.annotations.Reference
 *  org.apache.felix.scr.annotations.Service
 *  org.apache.http.HttpEntity
 *  org.apache.http.StatusLine
 *  org.apache.http.client.config.RequestConfig
 *  org.apache.http.client.config.RequestConfig$Builder
 *  org.apache.http.client.entity.UrlEncodedFormEntity
 *  org.apache.http.client.methods.CloseableHttpResponse
 *  org.apache.http.client.methods.HttpPost
 *  org.apache.http.client.methods.HttpUriRequest
 *  org.apache.http.impl.client.CloseableHttpClient
 *  org.apache.http.impl.client.HttpClientBuilder
 *  org.apache.http.impl.client.HttpClients
 *  org.apache.http.message.BasicNameValuePair
 *  org.apache.sling.discovery.DiscoveryService
 *  org.apache.sling.discovery.InstanceDescription
 *  org.apache.sling.discovery.InstanceFilter
 *  org.apache.sling.discovery.TopologyView
 *  org.slf4j.Logger
 *  org.slf4j.LoggerFactory
 */
package com.adobe.granite.offloading.impl;

import com.adobe.granite.offloading.api.OffloadingException;
import com.adobe.granite.offloading.api.OffloadingTopicManager;
import com.adobe.granite.offloading.api.TopicConfigurationAction;
import com.adobe.granite.offloading.api.TopicInstancesHolder;
import com.adobe.granite.offloading.impl.TopicInstancesHolderImpl;
import com.adobe.granite.offloading.impl.util.OffloadingUtil;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.sling.discovery.DiscoveryService;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.InstanceFilter;
import org.apache.sling.discovery.TopologyView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component
@Service
public class OffloadingTopicManagerImpl
implements OffloadingTopicManager {
    private static final Logger LOG = LoggerFactory.getLogger(OffloadingTopicManagerImpl.class);
    private static final int NETWORK_TEST_TIMEOUT_SECONDS = 3;
    @Reference
    private DiscoveryService discoveryService;

    @Override
    public Map<String, TopicInstancesHolder> getInstances() {
        HashMap<String, TopicInstancesHolder> topicToInstanceMap = new HashMap<String, TopicInstancesHolder>();
        TopologyView topologyView = this.discoveryService.getTopology();
        Set instances = topologyView.getInstances();
        for (InstanceDescription instance : instances) {
            Set<String> enabledTopics = OffloadingUtil.expandCSV(instance.getProperty("org.apache.sling.event.jobs.consumer.topics"));
            for (String topic : enabledTopics) {
                TopicInstancesHolder topicHolder = topicToInstanceMap.get(topic);
                if (topicHolder == null) {
                    topicHolder = new TopicInstancesHolderImpl(topic);
                    topicToInstanceMap.put(topic, topicHolder);
                }
                Set<InstanceDescription> enabledInstances = topicHolder.getEnabledInstances();
                enabledInstances.add(instance);
            }
            Set<String> disabledTopics = OffloadingUtil.expandCSV(instance.getProperty("com.adobe.granite.offloading.job.registeredtopics"));
            disabledTopics.removeAll(enabledTopics);
            for (String topic2 : disabledTopics) {
                TopicInstancesHolder topicHolder = topicToInstanceMap.get(topic2);
                if (topicHolder == null) {
                    topicHolder = new TopicInstancesHolderImpl(topic2);
                    topicToInstanceMap.put(topic2, topicHolder);
                }
                Set<InstanceDescription> disabledInstances = topicHolder.getDisabledInstances();
                disabledInstances.add(instance);
            }
        }
        return topicToInstanceMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void configureInstance(String slingID, Map<TopicConfigurationAction, Set<String>> configuration) {
        Set<String> enable;
        String currentWhitelistString;
        Set<String> whitelist;
        Set<String> exclusive;
        InstanceDescription instance = this.findInstanceByID(slingID);
        if (instance == null) {
            throw new OffloadingException("Cannot find instance with Sling ID " + slingID);
        }
        String endpointsString = instance.getProperty("org.apache.sling.instance.endpoints");
        if (StringUtils.isEmpty((String)endpointsString)) {
            throw new OffloadingException("Cannot detect endpoints for instance with Sling ID " + slingID);
        }
        if (configuration == null || configuration.isEmpty()) {
            throw new OffloadingException("Empty configuration map received for instance with Sling ID " + slingID);
        }
        Set<String> wildcard = this.getRegisteredTopics(instance);
        String currentBlacklistString = instance.getProperty("job.consumermanager.blacklist");
        Set<String> blacklist = OffloadingUtil.expandCSV(currentBlacklistString);
        if (blacklist.contains("*")) {
            blacklist = new HashSet<String>(wildcard);
        }
        if ((whitelist = OffloadingUtil.expandCSV(currentWhitelistString = instance.getProperty("job.consumermanager.whitelist"))).contains("*")) {
            whitelist = new HashSet<String>(wildcard);
        }
        if ((exclusive = configuration.get((Object)TopicConfigurationAction.EXCLUSIVE)) != null) {
            Iterator<String> whitelistIterator = whitelist.iterator();
            while (whitelistIterator.hasNext()) {
                String topic = whitelistIterator.next();
                if (exclusive.contains(topic)) continue;
                whitelistIterator.remove();
            }
            whitelist.addAll(exclusive);
            blacklist.removeAll(exclusive);
        }
        if ((enable = configuration.get((Object)TopicConfigurationAction.ENABLE)) != null && enable.contains("*")) {
            enable = new HashSet<String>(wildcard);
        }
        Set<String> disable = configuration.get((Object)TopicConfigurationAction.DISABLE);
        if (exclusive != null) {
            disable = null;
        }
        if (disable != null && disable.contains("*")) {
            disable = new HashSet<String>(wildcard);
        }
        if (enable != null && disable != null) {
            Iterator<String> enableIterator = enable.iterator();
            while (enableIterator.hasNext()) {
                String topic = enableIterator.next();
                if (!disable.remove(topic)) continue;
                enableIterator.remove();
            }
        }
        if (enable != null) {
            for (String s : enable) {
                blacklist.remove(s);
                if (wildcard.equals(whitelist)) continue;
                whitelist.remove(s);
            }
        }
        if (disable != null) {
            for (String s : disable) {
                blacklist.add(s);
                if (wildcard.equals(whitelist)) continue;
                whitelist.remove(s);
            }
        }
        ArrayList<BasicNameValuePair> nvps = new ArrayList<BasicNameValuePair>();
        if (whitelist.size() == 0 || wildcard.equals(whitelist)) {
            nvps.add(new BasicNameValuePair("job.consumermanager.whitelist", "*"));
        } else {
            int i = 0;
            for (String topic : whitelist) {
                nvps.add(new BasicNameValuePair("job.consumermanager.whitelist", topic));
                ++i;
            }
        }
        if (blacklist.size() == 0) {
            nvps.add(new BasicNameValuePair("job.consumermanager.blacklist", ""));
        } else {
            int i = 0;
            for (String topic : blacklist) {
                nvps.add(new BasicNameValuePair("job.consumermanager.blacklist", topic));
                ++i;
            }
        }
        nvps.add(new BasicNameValuePair("org.apache.sling.installer.configuration.persist", Boolean.FALSE.toString()));
        int statusCode = 0;
        Set<String> endpoints = OffloadingUtil.expandCSV(endpointsString);
        Iterator<String> i$ = endpoints.iterator();
        while (i$.hasNext()) {
            String endpoint = i$.next();
            String host = OffloadingUtil.getHostFromEndpoint(endpoint);
            try {
                if (InetAddress.getByName(host).isReachable(3000)) {
                    if (endpoint.endsWith("/")) {
                        endpoint = endpoint.substring(0, endpoint.length() - 1);
                    }
                    String url = endpoint + "/libs/granite/offloading/config/jobconsumer";
                    HttpPost post = new HttpPost(url);
                    post.setEntity((HttpEntity)new UrlEncodedFormEntity(nvps));
                    CloseableHttpClient client = this.getHTTPClient();
                    CloseableHttpResponse response = null;
                    try {
                        response = client.execute((HttpUriRequest)post);
                        statusCode = response.getStatusLine().getStatusCode();
                        if (statusCode != 200 && statusCode != 201) continue;
                        break;
                    }
                    catch (Exception ex) {
                        LOG.error("Unable to send configuration request to " + url, (Throwable)ex);
                        continue;
                    }
                    finally {
                        if (response != null) {
                            response.close();
                        }
                        continue;
                    }
                }
                LOG.error("Unreachable host: " + endpoint);
            }
            catch (Exception e) {
                LOG.error("Unreachable host: " + endpoint);
            }
        }
        if (statusCode != 200 && statusCode != 201) {
            throw new OffloadingException("Unable to send configuration request to instance with Sling ID " + slingID + ". " + "Got status code " + statusCode);
        }
    }

    @Override
    public Set<String> getRegisteredTopics(InstanceDescription instance) {
        String registeredTopics = instance.getProperty("com.adobe.granite.offloading.job.registeredtopics");
        return OffloadingUtil.expandCSV(registeredTopics);
    }

    @Override
    public Set<String> getWhitelistedTopics(InstanceDescription instance) {
        String whitelistString = instance.getProperty("job.consumermanager.whitelist");
        Set<String> whitelist = OffloadingUtil.expandCSV(whitelistString);
        for (String topic : whitelist) {
            if (!"*".equals(topic)) continue;
            whitelist = this.getRegisteredTopics(instance);
            break;
        }
        return whitelist;
    }

    @Override
    public Set<String> getBlacklistedTopics(InstanceDescription instance) {
        String blacklistString = instance.getProperty("job.consumermanager.blacklist");
        Set<String> blacklist = OffloadingUtil.expandCSV(blacklistString);
        for (String topic : blacklist) {
            if (!"*".equals(topic)) continue;
            blacklist = this.getRegisteredTopics(instance);
            break;
        }
        return blacklist;
    }

    private InstanceDescription findInstanceByID(final String slingID) {
        TopologyView topologyView = this.discoveryService.getTopology();
        InstanceDescription id = null;
        Set instances = topologyView.findInstances(new InstanceFilter(){

            public boolean accept(InstanceDescription instanceDescription) {
                return slingID.equals(instanceDescription.getSlingId());
            }
        });
        if (instances != null && instances.size() > 0) {
            Iterator it = instances.iterator();
            id = (InstanceDescription)it.next();
        }
        return id;
    }

    private CloseableHttpClient getHTTPClient() {
        RequestConfig config = RequestConfig.custom().setSocketTimeout(10000).build();
        HttpClientBuilder httpClient = HttpClients.custom().setDefaultRequestConfig(config);
        return httpClient.build();
    }

    protected void bindDiscoveryService(DiscoveryService discoveryService) {
        this.discoveryService = discoveryService;
    }

    protected void unbindDiscoveryService(DiscoveryService discoveryService) {
        if (this.discoveryService == discoveryService) {
            this.discoveryService = null;
        }
    }

}