OffloadingJobOffloader.java 15.3 KB
/*
 * Decompiled with CFR 0_118.
 * 
 * Could not load the following classes:
 *  org.apache.commons.lang.StringUtils
 *  org.apache.felix.scr.annotations.Activate
 *  org.apache.felix.scr.annotations.Component
 *  org.apache.felix.scr.annotations.Deactivate
 *  org.apache.felix.scr.annotations.Properties
 *  org.apache.felix.scr.annotations.Property
 *  org.apache.felix.scr.annotations.Reference
 *  org.apache.felix.scr.annotations.ReferenceCardinality
 *  org.apache.felix.scr.annotations.ReferencePolicy
 *  org.apache.felix.scr.annotations.Service
 *  org.apache.sling.api.resource.LoginException
 *  org.apache.sling.api.resource.ModifiableValueMap
 *  org.apache.sling.api.resource.PersistenceException
 *  org.apache.sling.api.resource.Resource
 *  org.apache.sling.api.resource.ResourceResolver
 *  org.apache.sling.api.resource.ResourceResolverFactory
 *  org.apache.sling.api.resource.ResourceUtil
 *  org.apache.sling.api.resource.ValueMap
 *  org.apache.sling.api.wrappers.ValueMapDecorator
 *  org.apache.sling.commons.osgi.PropertiesUtil
 *  org.osgi.service.event.Event
 *  org.osgi.service.event.EventHandler
 *  org.slf4j.Logger
 *  org.slf4j.LoggerFactory
 */
package com.adobe.granite.offloading.impl;

import com.adobe.granite.offloading.api.OffloadingException;
import com.adobe.granite.offloading.api.OffloadingJobProperties;
import com.adobe.granite.offloading.impl.OffloadingConfigurator;
import com.adobe.granite.offloading.impl.OffloadingEventProcessor;
import com.adobe.granite.offloading.impl.OffloadingLocations;
import com.adobe.granite.offloading.impl.OffloadingProcessor;
import com.adobe.granite.offloading.impl.OffloadingStatus;
import com.adobe.granite.offloading.impl.OffloadingTransporter;
import com.adobe.granite.offloading.impl.util.OffloadingResourceUtil;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.api.wrappers.ValueMapDecorator;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(metatype=1, label="Adobe Granite Offloading - Job Offloader")
@Service(value={EventHandler.class})
@Properties(value={@Property(name="event.topics", value={"org/apache/sling/api/resource/Resource/ADDED", "org/apache/sling/api/resource/Resource/CHANGED"}, propertyPrivate=1), @Property(name="event.filter", value={"(!(event.application=*))"}, propertyPrivate=1)})
@Reference(referenceInterface=OffloadingTransporter.class, cardinality=ReferenceCardinality.MANDATORY_MULTIPLE, policy=ReferencePolicy.DYNAMIC, name="transporter")
public class OffloadingJobOffloader
implements EventHandler,
OffloadingEventProcessor {
    private final Logger log;
    private Map<String, OffloadingTransporter> transporterMap;
    private static final String SUB_SERVICE_NAME = "offloading";
    private static final boolean OFFLOADER_ENABLED_DEFAULT = false;
    private boolean offloaderEnabled;
    @Property(label="Offloader Enabled", description="Allows to enable/disable the offloader in general.", boolValue={0})
    private static final String OFFLOADER_ENABLED = "offloading.offloader.enabled";
    @Reference
    private ResourceResolverFactory resolverFactory;
    @Reference
    private OffloadingConfigurator configurator;
    private OffloadingProcessor offloadingProcessor;

    public OffloadingJobOffloader() {
        this.log = LoggerFactory.getLogger(this.getClass());
        this.transporterMap = new ConcurrentHashMap<String, OffloadingTransporter>();
        this.offloaderEnabled = false;
    }

    @Activate
    protected void activate(Map<String, Object> props) {
        this.offloadingProcessor = new OffloadingProcessor(this, this.configurator, false, "Adobe Granite Offloading: Job Offloader");
        this.modified(props);
    }

    protected void modified(Map<String, Object> props) {
        ValueMapDecorator properties = new ValueMapDecorator(props);
        this.offloaderEnabled = (Boolean)properties.get("offloading.offloader.enabled", (Object)false);
        if (this.offloaderEnabled) {
            this.offloadingProcessor.start();
        } else {
            this.offloadingProcessor.stop();
        }
    }

    @Deactivate
    protected void deactivate() {
        this.offloadingProcessor.stop();
        this.offloadingProcessor = null;
    }

    protected void bindTransporter(OffloadingTransporter transporter, Map<String, Object> properties) {
        String transporterName = PropertiesUtil.toString((Object)properties.get("offloading.transporter.name"), (String)"");
        if (StringUtils.isNotBlank((String)transporterName)) {
            this.transporterMap.put(transporterName, transporter);
        } else {
            this.log.warn("Not able to bind transporter {}, because of missing service property", (Object)transporter.getClass().getCanonicalName());
        }
    }

    protected void unbindTransporter(OffloadingTransporter transporter, Map<String, Object> properties) {
        String transporterName = PropertiesUtil.toString((Object)properties.get("offloading.transporter.name"), (String)"");
        if (StringUtils.isNotBlank((String)transporterName)) {
            this.transporterMap.remove(transporterName);
        } else {
            this.log.warn("Not able to unbind transporter {}, because of missing service property", (Object)transporter.getClass().getCanonicalName());
        }
    }

    public void handleEvent(Event event) {
        if (this.offloaderEnabled) {
            this.offloadingProcessor.handleEvent(event);
        }
    }

    private boolean isEventForOffloader(Event event) {
        String path = (String)event.getProperty("path");
        String resourceType = (String)event.getProperty("resourceType");
        return StringUtils.isNotBlank((String)path) && StringUtils.isNotBlank((String)resourceType) && path.startsWith(this.configurator.getOffloadingJobPath()) && resourceType.equals(this.configurator.getJobType());
    }

    private OffloadingTransporter getTransporter() {
        OffloadingTransporter transporter = this.transporterMap.get(this.configurator.getTransporterName());
        if (transporter == null) {
            this.log.warn("no transporter found for configured transporter {}, will try to fallback to default transporter.", (Object)this.configurator.getTransporterName());
            transporter = this.transporterMap.get("offloading.transporter.name");
        }
        if (transporter == null) {
            throw new OffloadingException("no transporter is bound or found for configured transporter " + this.configurator.getTransporterName());
        }
        this.log.debug("Using transporter: " + transporter.getClass().getName());
        return transporter;
    }

    private void transportToWorker(Resource offloadingJobResource) throws PersistenceException {
        if (this.isOffloadingJob(offloadingJobResource)) {
            try {
                this.markJobOffloadedToWorker(offloadingJobResource);
                this.getTransporter().transportToWorker(offloadingJobResource);
            }
            catch (OffloadingException e) {
                this.log.error(e.getMessage(), (Throwable)e);
                this.markJobToWorkerFailed(offloadingJobResource, e.getMessage());
            }
        }
    }

    private void transportToMaster(Resource offloadingJobResource) throws PersistenceException {
        if (this.isOffloadingJob(offloadingJobResource)) {
            try {
                this.markJobBackOnMaster(offloadingJobResource);
                this.getTransporter().transportToMaster(offloadingJobResource);
                OffloadingResourceUtil.deleteJob(offloadingJobResource, this.configurator.getSlingJobPath(), this.configurator.getOffloadingJobPath());
            }
            catch (OffloadingException e) {
                this.log.error(e.getMessage(), (Throwable)e);
                this.markJobToMasterFailed(offloadingJobResource, e.getMessage());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void processEvent(Event event) {
        this.log.debug("process resource event {}", (Object)event.getTopic());
        ResourceResolver resolver = null;
        try {
            resolver = this.getServiceResolver();
            Resource jobResource = OffloadingResourceUtil.getJobResourceFromEvent(resolver, event);
            if (jobResource != null) {
                if (this.isNewJobForWorker(jobResource)) {
                    this.transportToWorker(jobResource);
                } else if (this.isNewJobForMaster(jobResource)) {
                    this.transportToMaster(jobResource);
                }
            }
            if (resolver.hasChanges()) {
                resolver.commit();
            }
        }
        catch (Exception e) {
            this.log.error(e.getMessage(), (Throwable)e);
        }
        finally {
            if (resolver != null) {
                resolver.close();
            }
        }
    }

    @Override
    public boolean doHandleEvent(Event event) {
        return this.isEventForOffloader(event);
    }

    private boolean isNewJobForWorker(Resource jobResource) {
        boolean newJobForWorker = false;
        if (jobResource != null) {
            ValueMap props = ResourceUtil.getValueMap((Resource)jobResource);
            String location = (String)props.get(OffloadingJobProperties.LOCATION.propertyName(), (Object)"");
            String status = (String)props.get(OffloadingJobProperties.STATUS.propertyName(), (Object)"");
            String transporterError = (String)props.get(OffloadingJobProperties.TRANSPORT_ERROR.propertyName(), (Object)"");
            newJobForWorker = location.equals(OffloadingLocations.MASTER.getPropertyValue()) && status.equals(OffloadingStatus.NEW.getStatusValue()) && StringUtils.isBlank((String)transporterError);
        }
        return newJobForWorker;
    }

    private boolean isNewJobForMaster(Resource jobResource) {
        boolean newJobForMaster = false;
        if (jobResource != null) {
            ValueMap props = ResourceUtil.getValueMap((Resource)jobResource);
            String location = (String)props.get(OffloadingJobProperties.LOCATION.propertyName(), (Object)"");
            String status = (String)props.get(OffloadingJobProperties.STATUS.propertyName(), (Object)"");
            String transporterError = (String)props.get(OffloadingJobProperties.TRANSPORT_ERROR.propertyName(), (Object)"");
            newJobForMaster = location.equals(OffloadingLocations.WORKER.getPropertyValue()) && !status.equals(OffloadingStatus.NEW.getStatusValue()) && !status.equals(OffloadingStatus.STARTED.getStatusValue()) && !status.equals(OffloadingStatus.FAILED.getStatusValue()) && StringUtils.isBlank((String)transporterError);
        }
        return newJobForMaster;
    }

    private void markJobOffloadedToWorker(Resource jobResource) throws PersistenceException {
        ModifiableValueMap jobProperties = (ModifiableValueMap)jobResource.adaptTo(ModifiableValueMap.class);
        jobProperties.put((Object)OffloadingJobProperties.TIME_START.propertyName(), (Object)Calendar.getInstance());
        jobProperties.put((Object)OffloadingJobProperties.LOCATION.propertyName(), (Object)OffloadingLocations.WORKER.getPropertyValue());
        jobResource.getResourceResolver().commit();
    }

    private void markJobToWorkerFailed(Resource jobResource, String reason) throws PersistenceException {
        ModifiableValueMap jobProperties = (ModifiableValueMap)jobResource.adaptTo(ModifiableValueMap.class);
        jobProperties.remove((Object)OffloadingJobProperties.TIME_START.propertyName());
        jobProperties.put((Object)OffloadingJobProperties.LOCATION.propertyName(), (Object)OffloadingLocations.MASTER.getPropertyValue());
        if (StringUtils.isBlank((String)reason)) {
            reason = "Transport error, but no reason given.";
        }
        jobProperties.put((Object)OffloadingJobProperties.TRANSPORT_ERROR.propertyName(), (Object)reason);
        jobResource.getResourceResolver().commit();
    }

    private void markJobBackOnMaster(Resource jobResource) throws PersistenceException {
        ModifiableValueMap jobProperties = (ModifiableValueMap)jobResource.adaptTo(ModifiableValueMap.class);
        jobProperties.put((Object)OffloadingJobProperties.TIME_END.propertyName(), (Object)Calendar.getInstance());
        jobProperties.put((Object)OffloadingJobProperties.LOCATION.propertyName(), (Object)OffloadingLocations.MASTER.getPropertyValue());
        jobResource.getResourceResolver().commit();
    }

    private void markJobToMasterFailed(Resource jobResource, String reason) throws PersistenceException {
        ModifiableValueMap jobProperties = (ModifiableValueMap)jobResource.adaptTo(ModifiableValueMap.class);
        jobProperties.remove((Object)OffloadingJobProperties.TIME_END.propertyName());
        jobProperties.put((Object)OffloadingJobProperties.LOCATION.propertyName(), (Object)OffloadingLocations.WORKER.getPropertyValue());
        if (StringUtils.isBlank((String)reason)) {
            reason = "Transport error, but no reason given.";
        }
        jobProperties.put((Object)OffloadingJobProperties.TRANSPORT_ERROR.propertyName(), (Object)reason);
        jobResource.getResourceResolver().commit();
    }

    private boolean isOffloadingJob(Resource jobResource) {
        return jobResource != null && jobResource.getPath().startsWith(this.configurator.getOffloadingJobPath());
    }

    private ResourceResolver getServiceResolver() throws LoginException {
        HashMap<String, String> authenticationInfo = new HashMap<String, String>();
        authenticationInfo.put("sling.service.subservice", "offloading");
        return this.resolverFactory.getServiceResourceResolver(authenticationInfo);
    }

    protected void bindResolverFactory(ResourceResolverFactory resourceResolverFactory) {
        this.resolverFactory = resourceResolverFactory;
    }

    protected void unbindResolverFactory(ResourceResolverFactory resourceResolverFactory) {
        if (this.resolverFactory == resourceResolverFactory) {
            this.resolverFactory = null;
        }
    }

    protected void bindConfigurator(OffloadingConfigurator offloadingConfigurator) {
        this.configurator = offloadingConfigurator;
    }

    protected void unbindConfigurator(OffloadingConfigurator offloadingConfigurator) {
        if (this.configurator == offloadingConfigurator) {
            this.configurator = null;
        }
    }
}