OffloadingProcessor.java 10.1 KB
/*
 * Decompiled with CFR 0_118.
 * 
 * Could not load the following classes:
 *  org.apache.commons.lang.StringUtils
 *  org.apache.sling.discovery.InstanceDescription
 *  org.apache.sling.discovery.InstanceFilter
 *  org.apache.sling.discovery.TopologyEvent
 *  org.apache.sling.discovery.TopologyEvent$Type
 *  org.apache.sling.discovery.TopologyView
 *  org.osgi.service.event.Event
 *  org.slf4j.Logger
 *  org.slf4j.LoggerFactory
 */
package com.adobe.granite.offloading.impl;

import com.adobe.granite.offloading.impl.OffloadingConfigurator;
import com.adobe.granite.offloading.impl.OffloadingEventProcessor;
import com.adobe.granite.offloading.impl.OffloadingInstanceFilter;
import com.adobe.granite.offloading.impl.util.OffloadingUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.InstanceFilter;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyView;
import org.osgi.service.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OffloadingProcessor
implements Runnable {
    private static final String STOP_TOPIC = "com/adobe/granite/offloading/stop/topic";
    private static final Event STOP_EVENT = new Event("com/adobe/granite/offloading/stop/topic", new HashMap());
    private static final String PAUSE_TOPIC = "com/adobe/granite/offloading/pause/topic";
    private static final Event PAUSE_EVENT = new Event("com/adobe/granite/offloading/pause/topic", new HashMap());
    private final Logger log;
    private final Object pauseLock;
    private final AtomicBoolean isRunning;
    private final AtomicBoolean paused;
    private final BlockingQueue<Event> eventQueue;
    private final Map<String, InstanceDescription> offloadingInstancesCache;
    private final AtomicBoolean isTopoStable;
    private final OffloadingEventProcessor eventProcessor;
    private final OffloadingConfigurator configurator;
    private volatile TopologyView topoView;
    private String threadName;
    private boolean topologyAware;

    public OffloadingProcessor(OffloadingEventProcessor eventProcessor, OffloadingConfigurator configurator, boolean topologyAware, String threadName) {
        this.log = LoggerFactory.getLogger(this.getClass());
        this.pauseLock = new Object();
        this.isRunning = new AtomicBoolean(false);
        this.paused = new AtomicBoolean(true);
        this.eventQueue = new LinkedBlockingQueue<Event>();
        this.offloadingInstancesCache = new ConcurrentHashMap<String, InstanceDescription>();
        this.isTopoStable = new AtomicBoolean(false);
        this.topologyAware = false;
        this.eventProcessor = eventProcessor;
        this.configurator = configurator;
        this.threadName = threadName;
        this.topologyAware = topologyAware;
        this.isTopoStable.set(!topologyAware);
    }

    public boolean isRunning() {
        return this.isRunning.get();
    }

    public boolean isPaused() {
        return this.paused.get();
    }

    public boolean isTopoStable() {
        return this.isTopoStable.get();
    }

    public void start() {
        if (this.isTopoStable.get() && !this.isRunning.get()) {
            this.log.debug("New Offloading Processor: [{}]", (Object)this.threadName);
            this.isRunning.set(true);
            this.paused.set(false);
            Thread loaderThread = new Thread((Runnable)this, this.threadName);
            loaderThread.setDaemon(true);
            loaderThread.start();
        }
    }

    public void stop() {
        this.isRunning.set(false);
        this.resume();
        this.paused.set(true);
        this.log.debug("Stopping: [{}]", (Object)this.threadName);
        try {
            this.eventQueue.put(STOP_EVENT);
        }
        catch (InterruptedException e) {
            this.log.error(e.getMessage(), (Throwable)e);
        }
    }

    public void pause() {
        if (!this.paused.get()) {
            this.log.debug("Pausing: [{}]", (Object)this.threadName);
            this.paused.set(true);
            try {
                this.eventQueue.put(PAUSE_EVENT);
            }
            catch (InterruptedException e) {
                this.log.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resume() {
        Object object = this.pauseLock;
        synchronized (object) {
            if (this.paused.get()) {
                this.log.debug("Resuming: [{}]", (Object)this.threadName);
                this.paused.set(false);
                this.pauseLock.notify();
            }
        }
    }

    public void handleEvent(Event event) {
        if (this.isRunning.get() && this.eventProcessor.doHandleEvent(event)) {
            try {
                String topic = event.getTopic();
                String path = (String)event.getProperty("path");
                this.log.debug("adding event with topic {} on path {} to event queue for processing", (Object)topic, (Object)path);
                this.eventQueue.put(event);
            }
            catch (InterruptedException e) {
                this.log.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    public void handleTopologyEvent(TopologyEvent event) {
        if (this.topologyAware) {
            TopologyEvent.Type type = event.getType();
            this.log.debug("process topology event type {}", (Object)type);
            switch (type) {
                case TOPOLOGY_CHANGING: {
                    this.isTopoStable.set(false);
                    this.pause();
                    this.clearTopoView();
                    break;
                }
                case TOPOLOGY_CHANGED: 
                case TOPOLOGY_INIT: {
                    this.pause();
                    this.clearTopoView();
                    this.updateTopoView(event.getNewView());
                    this.isTopoStable.set(true);
                    if (this.isRunning.get()) {
                        this.resume();
                        break;
                    }
                    this.start();
                }
            }
        }
    }

    public boolean isOffloadingInstance(String targetInstance) {
        if (this.isTopoStable.get()) {
            return this.offloadingInstancesCache.containsKey(targetInstance);
        }
        this.log.warn("Not able to determine if given targetInstance {} is an offloading instance. Looks like the topology is in changing state.", (Object)targetInstance);
        return false;
    }

    public String getSelfInstanceId() {
        if (this.isTopoStable.get()) {
            return this.topoView != null ? this.topoView.getLocalInstance().getSlingId() : "";
        }
        this.log.warn("Not able to determine self instance id. Looks like the topology is in changing state.");
        return "";
    }

    public boolean isReassignEvent(Event event) {
        String topic = event.getTopic();
        String path = (String)event.getProperty("path");
        String assignedInstanceId = OffloadingUtil.getAssignedInstanceFromJobPath(this.configurator.getSlingJobPath(), path);
        String selfInstanceId = this.getSelfInstanceId();
        return topic.equals("org/apache/sling/api/resource/Resource/REMOVED") && path.startsWith(this.configurator.getSlingJobPath()) && StringUtils.isNotBlank((String)assignedInstanceId) && StringUtils.isNotBlank((String)selfInstanceId) && !selfInstanceId.equals(assignedInstanceId) && !this.isOffloadingInstance(assignedInstanceId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.log.debug("Start: [{}]", (Object)this.threadName);
        while (this.isRunning.get()) {
            Object object = this.pauseLock;
            synchronized (object) {
                while (this.isRunning.get() && this.paused.get()) {
                    this.log.debug("Paused: [{}]", (Object)this.threadName);
                    try {
                        this.pauseLock.wait();
                    }
                    catch (InterruptedException e) {
                        this.log.error(e.getMessage(), (Throwable)e);
                        Thread.currentThread().interrupt();
                        this.isRunning.set(false);
                    }
                }
                this.log.debug("Resumed: [{}]", (Object)this.threadName);
            }
            while (this.isRunning.get() && !this.paused.get()) {
                try {
                    Event event = this.eventQueue.take();
                    String topic = event.getTopic();
                    if (topic.equals("com/adobe/granite/offloading/pause/topic")) {
                        this.log.debug("Processing pause event, do nothing");
                        continue;
                    }
                    if (topic.equals("com/adobe/granite/offloading/stop/topic")) {
                        this.log.debug("Processing stop event, do nothing");
                        continue;
                    }
                    this.eventProcessor.processEvent(event);
                }
                catch (InterruptedException e) {
                    this.log.error(e.getMessage(), (Throwable)e);
                }
            }
        }
        this.log.debug("Stopped: [{}]", (Object)this.threadName);
    }

    private void clearTopoView() {
        this.topoView = null;
        this.offloadingInstancesCache.clear();
    }

    private void updateTopoView(TopologyView newView) {
        this.topoView = newView;
        InstanceDescription mySelf = this.topoView.getLocalInstance();
        Set offloadingInstances = this.topoView.findInstances((InstanceFilter)new OffloadingInstanceFilter(mySelf));
        for (InstanceDescription instance : offloadingInstances) {
            this.offloadingInstancesCache.put(instance.getSlingId(), instance);
        }
    }

}