OffloadingJobCloner.java 19.4 KB
/*
 * Decompiled with CFR 0_118.
 * 
 * Could not load the following classes:
 *  org.apache.commons.lang.StringUtils
 *  org.apache.felix.scr.annotations.Activate
 *  org.apache.felix.scr.annotations.Component
 *  org.apache.felix.scr.annotations.Deactivate
 *  org.apache.felix.scr.annotations.Properties
 *  org.apache.felix.scr.annotations.Property
 *  org.apache.felix.scr.annotations.Reference
 *  org.apache.felix.scr.annotations.Service
 *  org.apache.sling.api.resource.LoginException
 *  org.apache.sling.api.resource.PersistenceException
 *  org.apache.sling.api.resource.Resource
 *  org.apache.sling.api.resource.ResourceResolver
 *  org.apache.sling.api.resource.ResourceResolverFactory
 *  org.apache.sling.api.resource.ResourceUtil
 *  org.apache.sling.api.resource.ValueMap
 *  org.apache.sling.api.wrappers.ValueMapDecorator
 *  org.apache.sling.discovery.TopologyEvent
 *  org.apache.sling.discovery.TopologyEventListener
 *  org.osgi.service.event.Event
 *  org.osgi.service.event.EventHandler
 *  org.slf4j.Logger
 *  org.slf4j.LoggerFactory
 */
package com.adobe.granite.offloading.impl;

import com.adobe.granite.offloading.api.OffloadingJobProperties;
import com.adobe.granite.offloading.impl.OffloadingConfigurator;
import com.adobe.granite.offloading.impl.OffloadingEventProcessor;
import com.adobe.granite.offloading.impl.OffloadingLocations;
import com.adobe.granite.offloading.impl.OffloadingProcessor;
import com.adobe.granite.offloading.impl.OffloadingStatus;
import com.adobe.granite.offloading.impl.util.OffloadingResourceUtil;
import com.adobe.granite.offloading.impl.util.OffloadingUtil;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.api.wrappers.ValueMapDecorator;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEventListener;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(metatype=1, label="Adobe Granite Offloading - Job Cloner")
@Service
@Properties(value={@Property(name="event.topics", value={"org/apache/sling/api/resource/Resource/ADDED", "org/apache/sling/api/resource/Resource/CHANGED", "org/apache/sling/api/resource/Resource/REMOVED", "org/apache/sling/event/notification/job/*"}, propertyPrivate=1), @Property(name="event.filter", value={"(!(event.application=*))"}, propertyPrivate=1)})
public class OffloadingJobCloner
implements TopologyEventListener,
EventHandler,
OffloadingEventProcessor {
    private final Logger log;
    @Reference
    private ResourceResolverFactory resolverFactory;
    @Reference
    private OffloadingConfigurator configurator;
    private OffloadingProcessor offloadingProcessor;
    private static final String SUB_SERVICE_NAME = "jobcloner";
    private static final boolean JOBCLONER_ENABLED_DEFAULT = false;
    private boolean jobClonerEnabled;
    @Property(label="Job Cloner Enabled", description="Allows to enable/disable the job cloner in general.", boolValue={0})
    private static final String JOBCLONER_ENABLED = "offloading.jobcloner.enabled";

    public OffloadingJobCloner() {
        this.log = LoggerFactory.getLogger(this.getClass());
        this.jobClonerEnabled = false;
    }

    @Activate
    protected void activate(Map<String, Object> props) {
        this.offloadingProcessor = new OffloadingProcessor(this, this.configurator, true, "Adobe Granite Offloading: Job Cloner");
        this.modified(props);
    }

    protected void modified(Map<String, Object> props) {
        ValueMapDecorator properties = new ValueMapDecorator(props);
        this.jobClonerEnabled = (Boolean)properties.get("offloading.jobcloner.enabled", (Object)false);
        if (this.jobClonerEnabled) {
            this.offloadingProcessor.start();
        } else {
            this.offloadingProcessor.stop();
        }
    }

    @Deactivate
    protected void deactivate() {
        this.offloadingProcessor.stop();
        this.offloadingProcessor = null;
    }

    public void handleEvent(Event event) {
        if (this.jobClonerEnabled) {
            this.offloadingProcessor.handleEvent(event);
        }
    }

    public void handleTopologyEvent(TopologyEvent event) {
        if (this.jobClonerEnabled) {
            this.offloadingProcessor.handleTopologyEvent(event);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void processEvent(Event event) {
        String topic = event.getTopic();
        String path = (String)event.getProperty("path");
        this.log.debug("process event with topic {} on path {}", (Object)topic, (Object)path);
        ResourceResolver resolver = null;
        try {
            resolver = this.getServiceResolver();
            if (this.isJobNotificationEvent(event)) {
                Resource offloadingResource = OffloadingResourceUtil.getJobResourceFromJobNotification(resolver, event, this.configurator.getOffloadingJobPath());
                if (offloadingResource != null) {
                    if (topic.equals("org/apache/sling/event/notification/job/START")) {
                        this.jobStarted(offloadingResource);
                    } else if (topic.equals("org/apache/sling/event/notification/job/FINISHED")) {
                        this.jobFinished(offloadingResource);
                    } else if (topic.equals("org/apache/sling/event/notification/job/CANCELLED")) {
                        this.jobCancelled(offloadingResource);
                    } else if (topic.equals("org/apache/sling/event/notification/job/FAILED")) {
                        this.jobFailed(offloadingResource);
                    }
                }
            } else if (this.offloadingProcessor.isReassignEvent(event)) {
                this.handleReassignEvent(resolver, event);
            } else {
                Resource jobResource = OffloadingResourceUtil.getJobResourceFromEvent(resolver, event);
                if (jobResource != null) {
                    if (this.isSyncJob(jobResource)) {
                        this.syncJobResource(jobResource);
                    } else if (this.isSyncOffloadingJob(jobResource)) {
                        this.syncOffloadingJobResource(jobResource);
                    }
                }
            }
            if (resolver.hasChanges()) {
                resolver.commit();
            }
        }
        catch (Exception e) {
            this.log.error(e.getMessage(), (Throwable)e);
        }
        finally {
            if (resolver != null) {
                resolver.close();
            }
        }
    }

    @Override
    public boolean doHandleEvent(Event event) {
        return this.isJobNotificationEvent(event) || this.isEventForJobCloner(event);
    }

    private ResourceResolver getServiceResolver() throws LoginException {
        HashMap<String, String> authenticationInfo = new HashMap<String, String>();
        authenticationInfo.put("sling.service.subservice", "jobcloner");
        return this.resolverFactory.getServiceResourceResolver(authenticationInfo);
    }

    private void handleReassignEvent(ResourceResolver resourceResolver, Event event) throws PersistenceException {
        String path = (String)event.getProperty("path");
        String topic = event.getTopic();
        this.log.debug("handle reassign event for topic {} on path {}", (Object)topic, (Object)path);
        String offloadingJobPath = OffloadingUtil.convertSlingJobPathToOffloadingJobPath(this.configurator.getSlingJobPath(), this.configurator.getOffloadingJobPath(), path);
        Resource offloadingJob = resourceResolver.getResource(offloadingJobPath);
        if (offloadingJob != null) {
            this.log.debug("delete reassigned job at path {}", (Object)offloadingJobPath);
            resourceResolver.delete(offloadingJob);
        }
    }

    private boolean isSyncOffloadingJob(Resource jobResource) {
        boolean isSyncOffloadingJob = false;
        if (jobResource != null) {
            String path = jobResource.getPath();
            String type = jobResource.getResourceType();
            isSyncOffloadingJob = path.startsWith(this.configurator.getOffloadingJobPath()) && type.equals(this.configurator.getJobType());
        }
        return isSyncOffloadingJob;
    }

    private boolean isSyncJob(Resource jobResource) {
        boolean isSyncJob = false;
        if (jobResource != null) {
            ValueMap props = ResourceUtil.getValueMap((Resource)jobResource);
            String path = jobResource.getPath();
            String type = jobResource.getResourceType();
            String startedTime = (String)props.get("event.job.started.time", (Object)"");
            isSyncJob = path.startsWith(this.configurator.getSlingJobPath()) && type.equals(this.configurator.getJobType()) && StringUtils.isBlank((String)startedTime);
        }
        return isSyncJob;
    }

    private boolean isJobNotificationEvent(Event event) {
        return event.getTopic().startsWith("org/apache/sling/event/notification/job");
    }

    private boolean isEventForJobCloner(Event event) {
        String path = (String)event.getProperty("path");
        if (event.getTopic().equals("org/apache/sling/api/resource/Resource/REMOVED")) {
            return path.startsWith(this.configurator.getSlingJobPath());
        }
        String resourceType = (String)event.getProperty("resourceType");
        return StringUtils.isNotBlank((String)resourceType) && resourceType.equals(this.configurator.getJobType()) && (path.startsWith(this.configurator.getOffloadingJobPath()) || path.startsWith(this.configurator.getSlingJobPath()));
    }

    private void syncJobResource(Resource jobResource) {
        ValueMap jobProperties = ResourceUtil.getValueMap((Resource)jobResource);
        String targetInstance = (String)jobProperties.get("event.job.application", (Object)"");
        if (this.offloadingProcessor.isOffloadingInstance(targetInstance)) {
            OffloadingResourceUtil.markJobStarted(jobResource);
            try {
                this.syncNewJob(jobResource);
            }
            catch (PersistenceException e) {
                this.log.error("Error cloning job {}", (Object)e.getMessage(), (Object)e);
                OffloadingResourceUtil.markJobNotStarted(jobResource);
            }
        }
    }

    private void syncOffloadingJobResource(Resource offloadingJobResource) throws PersistenceException {
        ValueMap jobProperties = ResourceUtil.getValueMap((Resource)offloadingJobResource);
        String status = (String)jobProperties.get(OffloadingJobProperties.STATUS.propertyName(), (Object)"");
        if (this.onMaster(jobProperties)) {
            if (status.equals(OffloadingStatus.FINISHED.getStatusValue())) {
                this.offloadingJobFinished(offloadingJobResource);
            } else if (status.equals(OffloadingStatus.CANCELLED.getStatusValue())) {
                this.offloadingJobCancelled(offloadingJobResource);
            } else if (status.equals(OffloadingStatus.FAILED.getStatusValue())) {
                this.offloadingJobFailed(offloadingJobResource);
            }
        } else if (this.onWorker(jobProperties)) {
            if (status.equals(OffloadingStatus.NEW.getStatusValue())) {
                this.newOffloadingJob(offloadingJobResource);
            } else if (status.equals(OffloadingStatus.STARTED.getStatusValue())) {
                this.checkOffloadingInProgress(offloadingJobResource);
            }
        } else {
            this.log.debug("Not able to determine if job is currently on master or worker.");
        }
    }

    private boolean onWorker(ValueMap jobProperties) {
        String location = (String)jobProperties.get(OffloadingJobProperties.LOCATION.propertyName(), (Object)"");
        String targetInstance = (String)jobProperties.get("event.job.application", (Object)"");
        String selfInstance = this.offloadingProcessor.getSelfInstanceId();
        return location.equals(OffloadingLocations.WORKER.getPropertyValue()) && targetInstance.equals(selfInstance);
    }

    private boolean onMaster(ValueMap jobProperties) {
        String location = (String)jobProperties.get(OffloadingJobProperties.LOCATION.propertyName(), (Object)"");
        String targetInstance = (String)jobProperties.get("event.job.application", (Object)"");
        String selfInstance = this.offloadingProcessor.getSelfInstanceId();
        return location.equals(OffloadingLocations.MASTER.getPropertyValue()) && !targetInstance.equals(selfInstance);
    }

    private void newOffloadingJob(Resource offloadingJobResource) throws PersistenceException {
        this.log.debug("new offloading job {}", (Object)offloadingJobResource.getPath());
        String jobPath = this.getJobPath(offloadingJobResource);
        ResourceResolver resolver = offloadingJobResource.getResourceResolver();
        Resource jobResource = resolver.getResource(jobPath);
        if (jobResource == null) {
            OffloadingResourceUtil.markOffloadingJobStarted(offloadingJobResource);
            this.createJobResource(jobPath, offloadingJobResource);
        }
    }

    private void checkOffloadingInProgress(Resource offloadingJobResource) {
        this.log.debug("check offloading in progress {}", (Object)offloadingJobResource.getPath());
        String jobPath = this.getJobPath(offloadingJobResource);
        Resource jobResource = offloadingJobResource.getResourceResolver().getResource(jobPath);
        if (jobResource == null) {
            OffloadingResourceUtil.markOffloadingJobFailed(offloadingJobResource);
        }
    }

    private void offloadingJobFailed(Resource offloadingJobResource) throws PersistenceException {
        this.log.debug("offloading job failed {}", (Object)offloadingJobResource.getPath());
        OffloadingResourceUtil.deleteJob(offloadingJobResource, this.configurator.getSlingJobPath(), this.configurator.getOffloadingJobPath());
    }

    private void offloadingJobCancelled(Resource offloadingJobResource) throws PersistenceException {
        this.log.debug("offloading job cancelled {}", (Object)offloadingJobResource.getPath());
        OffloadingResourceUtil.deleteJob(offloadingJobResource, this.configurator.getSlingJobPath(), this.configurator.getOffloadingJobPath());
    }

    private void offloadingJobFinished(Resource offloadingJobResource) throws PersistenceException {
        this.log.debug("offloadin\u0192g job finished {}", (Object)offloadingJobResource.getPath());
        OffloadingResourceUtil.deleteJob(offloadingJobResource, this.configurator.getSlingJobPath(), this.configurator.getOffloadingJobPath());
    }

    private void createJobResource(String jobPath, Resource offloadingJobResource) throws PersistenceException {
        this.log.debug("create new job resource {}", (Object)jobPath);
        String offloadingJobParentPath = ResourceUtil.getParent((String)jobPath);
        Resource jobParentResource = ResourceUtil.getOrCreateResource((ResourceResolver)offloadingJobResource.getResourceResolver(), (String)offloadingJobParentPath, (String)"sling:Folder", (String)"sling:Folder", (boolean)true);
        HashMap<String, Object> props = new HashMap<String, Object>();
        ValueMap jobProperties = ResourceUtil.getValueMap((Resource)offloadingJobResource);
        Set<String> filteredKeys = OffloadingUtil.filterKeySet(jobProperties.keySet(), Arrays.asList("jcr:created", "jcr:createdBy", "event.job.started.time"));
        for (String key : filteredKeys) {
            props.put(key, jobProperties.get((Object)key));
        }
        Resource job = jobParentResource.getResourceResolver().create(jobParentResource, offloadingJobResource.getName(), props);
        if (job != null) {
            this.log.debug("new job resource  created at {}", (Object)job.getPath());
        }
    }

    private void syncNewJob(Resource jobResource) throws PersistenceException {
        this.log.debug("cloning job {}", (Object)jobResource.getPath());
        String offloadingJobPath = this.getOffloadingJobPath(jobResource);
        String offloadingJobParentPath = ResourceUtil.getParent((String)offloadingJobPath);
        Resource offloadingJobParentResource = ResourceUtil.getOrCreateResource((ResourceResolver)jobResource.getResourceResolver(), (String)offloadingJobParentPath, (String)"sling:Folder", (String)"sling:Folder", (boolean)true);
        HashMap<String, Object> props = new HashMap<String, Object>();
        ValueMap jobProperties = ResourceUtil.getValueMap((Resource)jobResource);
        for (String key : jobProperties.keySet()) {
            if ("jcr:created".endsWith(key) || "jcr:createdBy".equals(key)) continue;
            props.put(key, jobProperties.get((Object)key));
        }
        props.put(OffloadingJobProperties.STATUS.propertyName(), OffloadingStatus.NEW.getStatusValue());
        props.put(OffloadingJobProperties.LOCATION.propertyName(), OffloadingLocations.MASTER.getPropertyValue());
        Resource clone = jobResource.getResourceResolver().create(offloadingJobParentResource, jobResource.getName(), props);
        if (clone != null) {
            this.log.debug("clone created at {}", (Object)clone.getPath());
        }
    }

    private String getOffloadingJobPath(Resource jobResource) {
        String jobPath = jobResource.getPath();
        return jobPath.replace(this.configurator.getSlingJobPath(), this.configurator.getOffloadingJobPath());
    }

    private String getJobPath(Resource offloadingJobResource) {
        String offloadingJobPath = offloadingJobResource.getPath();
        return offloadingJobPath.replace(this.configurator.getOffloadingJobPath(), this.configurator.getSlingJobPath());
    }

    private void jobFailed(Resource jobResource) {
        OffloadingResourceUtil.markOffloadingJobFailed(jobResource);
    }

    private void jobCancelled(Resource jobResource) {
        OffloadingResourceUtil.markOffloadingJobCancelled(jobResource);
    }

    private void jobFinished(Resource jobResource) {
        OffloadingResourceUtil.markOffloadingJobFinished(jobResource);
    }

    private void jobStarted(Resource jobResource) {
        OffloadingResourceUtil.markOffloadingJobStarted(jobResource);
    }

    protected void bindResolverFactory(ResourceResolverFactory resourceResolverFactory) {
        this.resolverFactory = resourceResolverFactory;
    }

    protected void unbindResolverFactory(ResourceResolverFactory resourceResolverFactory) {
        if (this.resolverFactory == resourceResolverFactory) {
            this.resolverFactory = null;
        }
    }

    protected void bindConfigurator(OffloadingConfigurator offloadingConfigurator) {
        this.configurator = offloadingConfigurator;
    }

    protected void unbindConfigurator(OffloadingConfigurator offloadingConfigurator) {
        if (this.configurator == offloadingConfigurator) {
            this.configurator = null;
        }
    }
}