WorkflowOffloadingJobConsumer.java 9.92 KB
/*
 * Decompiled with CFR 0_118.
 * 
 * Could not load the following classes:
 *  com.adobe.granite.workflow.WorkflowException
 *  com.adobe.granite.workflow.WorkflowSession
 *  com.adobe.granite.workflow.exec.Workflow
 *  com.adobe.granite.workflow.exec.WorkflowData
 *  com.adobe.granite.workflow.metadata.MetaDataMap
 *  com.adobe.granite.workflow.model.WorkflowModel
 *  javax.jcr.Session
 *  org.apache.commons.lang.StringUtils
 *  org.apache.felix.scr.annotations.Component
 *  org.apache.felix.scr.annotations.ConfigurationPolicy
 *  org.apache.felix.scr.annotations.Properties
 *  org.apache.felix.scr.annotations.Property
 *  org.apache.felix.scr.annotations.Reference
 *  org.apache.felix.scr.annotations.ReferencePolicy
 *  org.apache.felix.scr.annotations.Service
 *  org.apache.sling.api.adapter.AdapterManager
 *  org.apache.sling.api.resource.LoginException
 *  org.apache.sling.api.resource.ResourceResolver
 *  org.apache.sling.api.resource.ResourceResolverFactory
 *  org.apache.sling.commons.osgi.PropertiesUtil
 *  org.apache.sling.event.jobs.Job
 *  org.apache.sling.event.jobs.JobManager
 *  org.apache.sling.event.jobs.consumer.JobConsumer
 *  org.apache.sling.event.jobs.consumer.JobConsumer$AsyncHandler
 *  org.apache.sling.event.jobs.consumer.JobConsumer$JobResult
 *  org.osgi.service.event.Event
 *  org.osgi.service.event.EventHandler
 *  org.slf4j.Logger
 *  org.slf4j.LoggerFactory
 */
package com.adobe.granite.offloading.workflow.impl;

import com.adobe.granite.offloading.workflow.api.WorkflowOffloadingProperties;
import com.adobe.granite.workflow.WorkflowException;
import com.adobe.granite.workflow.WorkflowSession;
import com.adobe.granite.workflow.exec.Workflow;
import com.adobe.granite.workflow.exec.WorkflowData;
import com.adobe.granite.workflow.metadata.MetaDataMap;
import com.adobe.granite.workflow.model.WorkflowModel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jcr.Session;
import org.apache.commons.lang.StringUtils;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.ConfigurationPolicy;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.adapter.AdapterManager;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(metatype=1, name="com.adobe.granite.workflow.core.offloading.WorkflowOffloadingJobConsumer", policy=ConfigurationPolicy.REQUIRE, configurationFactory=1)
@Service
@Properties(value={@Property(name="job.topics", value={"com/adobe/granite/workflow/offloading"}), @Property(name="event.topics", value={"com/adobe/granite/workflow/event"}, propertyPrivate=1), @Property(name="event.filter", value={"(!(event.application=*))"}, propertyPrivate=1)})
public class WorkflowOffloadingJobConsumer
implements JobConsumer,
EventHandler {
    private final Logger log;
    private Map<String, JobConsumer.AsyncHandler> asyncHandlers;
    @Reference
    private ResourceResolverFactory resolverFactory;
    @Reference(policy=ReferencePolicy.STATIC)
    private AdapterManager adapterManager;
    @Reference
    private JobManager jobManager;

    public WorkflowOffloadingJobConsumer() {
        this.log = LoggerFactory.getLogger(this.getClass());
        this.asyncHandlers = new ConcurrentHashMap<String, JobConsumer.AsyncHandler>();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobConsumer.JobResult process(Job job) {
        JobConsumer.JobResult result = JobConsumer.JobResult.FAILED;
        String topic = job.getTopic();
        this.log.debug("process topic topic: {}", (Object)topic);
        String workflowModelPath = this.getWorkflowModelPath(job);
        if (StringUtils.isBlank((String)workflowModelPath)) {
            throw new IllegalArgumentException("The workflow model path must not be null");
        }
        String workflowPayload = this.getWorkflowPayload(job);
        if (StringUtils.isBlank((String)workflowPayload)) {
            throw new IllegalArgumentException("The workflow payload  must not be null");
        }
        ResourceResolver resolver = null;
        try {
            WorkflowData wfData;
            JobConsumer.AsyncHandler asyncHandler;
            resolver = this.resolverFactory.getAdministrativeResourceResolver(null);
            Session session = (Session)resolver.adaptTo(Session.class);
            WorkflowSession wfSession = (WorkflowSession)this.adapterManager.getAdapter((Object)session, WorkflowSession.class);
            WorkflowModel wfModel = wfSession.getModel(workflowModelPath);
            if (wfModel != null && (wfData = wfSession.newWorkflowData("JCR_PATH", (Object)workflowPayload)) != null) {
                ConcurrentHashMap<String, String> metaData = new ConcurrentHashMap<String, String>();
                metaData.put("offloading.job.id", job.getId());
                Workflow workflow = wfSession.startWorkflow(wfModel, wfData, metaData);
                this.log.debug("Workflow {} started for job {} with payload {}", (Object[])new String[]{workflow.getId(), job.getId(), workflowPayload});
            }
            if ((asyncHandler = (JobConsumer.AsyncHandler)job.getProperty(":sling:jobs:asynchandler")) != null) {
                this.asyncHandlers.put(job.getId(), asyncHandler);
                result = JobConsumer.JobResult.ASYNC;
            }
            this.log.debug("job {} processed with result {}", (Object)job.getId(), (Object)result.name());
        }
        catch (Exception e) {
            this.log.error(e.getMessage(), (Throwable)e);
            result = JobConsumer.JobResult.FAILED;
        }
        finally {
            if (resolver != null) {
                resolver.close();
            }
        }
        return result;
    }

    private String getWorkflowPayload(Job job) {
        return (String)job.getProperty(WorkflowOffloadingProperties.OFFLOADING_WORKFLOW_PAYLOAD.getPropertyName(), (Object)"");
    }

    private String getWorkflowModelPath(Job job) {
        return (String)job.getProperty(WorkflowOffloadingProperties.OFFLOADING_WORKFLOW_MODEL.getPropertyName(), (Object)"");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleEvent(Event event) {
        String workflowId = PropertiesUtil.toString((Object)event.getProperty("WorkflowInstanceId"), (String)"");
        String eventType = PropertiesUtil.toString((Object)event.getProperty("EventType"), (String)"");
        if (StringUtils.isNotBlank((String)workflowId) && StringUtils.isNotBlank((String)eventType)) {
            ResourceResolver resolver = null;
            try {
                resolver = this.resolverFactory.getAdministrativeResourceResolver(null);
                Session session = (Session)resolver.adaptTo(Session.class);
                WorkflowSession wfSession = (WorkflowSession)this.adapterManager.getAdapter((Object)session, WorkflowSession.class);
                Workflow workflow = wfSession.getWorkflow(workflowId);
                if (workflow != null) {
                    MetaDataMap metaData = workflow.getMetaDataMap();
                    String jobId = (String)metaData.get("offloading.job.id", (Object)"");
                    this.log.debug(jobId);
                    JobConsumer.AsyncHandler asyncHandler = this.asyncHandlers.get(jobId);
                    if (asyncHandler != null) {
                        if (eventType.equals("WorkflowCompleted")) {
                            this.log.debug("workflow completed -> async handler ok()");
                            asyncHandler.ok();
                            this.asyncHandlers.remove(jobId);
                        } else if (eventType.equals("WorkflowAborted")) {
                            this.log.debug("workflow aborted ->async handler failed()");
                            asyncHandler.failed();
                            this.asyncHandlers.remove(jobId);
                        }
                    }
                    this.log.debug("async handlers count={}", (Object)this.asyncHandlers.size());
                }
            }
            catch (LoginException e) {
                this.log.error(e.getMessage(), (Throwable)e);
            }
            catch (WorkflowException e) {
                this.log.error(e.getMessage(), (Throwable)e);
            }
            finally {
                if (resolver != null) {
                    resolver.close();
                    resolver = null;
                }
            }
        }
    }

    protected void bindResolverFactory(ResourceResolverFactory resourceResolverFactory) {
        this.resolverFactory = resourceResolverFactory;
    }

    protected void unbindResolverFactory(ResourceResolverFactory resourceResolverFactory) {
        if (this.resolverFactory == resourceResolverFactory) {
            this.resolverFactory = null;
        }
    }

    protected void bindAdapterManager(AdapterManager adapterManager) {
        this.adapterManager = adapterManager;
    }

    protected void unbindAdapterManager(AdapterManager adapterManager) {
        if (this.adapterManager == adapterManager) {
            this.adapterManager = null;
        }
    }

    protected void bindJobManager(JobManager jobManager) {
        this.jobManager = jobManager;
    }

    protected void unbindJobManager(JobManager jobManager) {
        if (this.jobManager == jobManager) {
            this.jobManager = null;
        }
    }
}