CQWorkflowStatisticsService.java 13 KB
/*
 * Decompiled with CFR 0_118.
 * 
 * Could not load the following classes:
 *  com.day.cq.statistics.StatisticsService
 *  com.day.cq.workflow.WorkflowException
 *  com.day.cq.workflow.WorkflowService
 *  com.day.cq.workflow.WorkflowSession
 *  com.day.cq.workflow.event.WorkflowEvent
 *  com.day.cq.workflow.exec.HistoryItem
 *  com.day.cq.workflow.exec.WorkItem
 *  com.day.cq.workflow.exec.Workflow
 *  com.day.cq.workflow.model.WorkflowModel
 *  com.day.cq.workflow.statistics.WorkflowStatisticService
 *  com.day.crx.statistics.Entry
 *  com.day.crx.statistics.Report
 *  javax.jcr.RepositoryException
 *  javax.jcr.Session
 *  org.apache.felix.scr.annotations.Activate
 *  org.apache.felix.scr.annotations.Component
 *  org.apache.felix.scr.annotations.Deactivate
 *  org.apache.felix.scr.annotations.Properties
 *  org.apache.felix.scr.annotations.Property
 *  org.apache.felix.scr.annotations.Reference
 *  org.apache.felix.scr.annotations.ReferencePolicy
 *  org.apache.felix.scr.annotations.Service
 *  org.apache.sling.commons.osgi.PropertiesUtil
 *  org.apache.sling.jcr.api.SlingRepository
 *  org.osgi.service.component.ComponentContext
 *  org.osgi.service.event.Event
 *  org.osgi.service.event.EventHandler
 *  org.slf4j.Logger
 *  org.slf4j.LoggerFactory
 */
package com.day.cq.workflow.impl.statistics;

import com.day.cq.statistics.StatisticsService;
import com.day.cq.workflow.WorkflowException;
import com.day.cq.workflow.WorkflowService;
import com.day.cq.workflow.WorkflowSession;
import com.day.cq.workflow.event.WorkflowEvent;
import com.day.cq.workflow.exec.HistoryItem;
import com.day.cq.workflow.exec.WorkItem;
import com.day.cq.workflow.exec.Workflow;
import com.day.cq.workflow.impl.ServiceLoginUtil;
import com.day.cq.workflow.impl.statistics.WorkflowInstanceReport;
import com.day.cq.workflow.impl.statistics.WorkflowInstanceView;
import com.day.cq.workflow.impl.statistics.WorkflowReport;
import com.day.cq.workflow.impl.statistics.WorkflowView;
import com.day.cq.workflow.model.WorkflowModel;
import com.day.cq.workflow.statistics.WorkflowStatisticService;
import com.day.crx.statistics.Entry;
import com.day.crx.statistics.Report;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.jcr.api.SlingRepository;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(metatype=1, immediate=1)
@Service(value={EventHandler.class, WorkflowStatisticService.class})
@Properties(value={@Property(name="event.topics", value={"com/day/cq/workflow/event"}, propertyPrivate=1), @Property(name="event.filter", value={"(!(event.application=*))"}, propertyPrivate=1)})
public class CQWorkflowStatisticsService
implements WorkflowStatisticService,
EventHandler {
    private static final Logger log = LoggerFactory.getLogger(CQWorkflowStatisticsService.class);
    private static final String WORKFLOW_STATS_PREFIX = "/var/statistics/workflows";
    private static final String WORKFLOW_STATISTICS_SERVICE_STOP = "WORKFLOW_STATISTICS_SERVICE_STOP";
    @Reference(policy=ReferencePolicy.STATIC)
    private WorkflowService workflowService = null;
    @Reference
    private StatisticsService statisticsService = null;
    @Reference
    private SlingRepository repository = null;
    @Property(description="Indicates if the workflow stats collection should be enabled", label="enable workflow stats collection", boolValue={1})
    private static final String PROP_ENABLED = "stats.enabled";
    private boolean statsCollectionEnabled = true;
    private final BlockingQueue<WorkflowEvent> queue = new LinkedBlockingQueue<WorkflowEvent>();
    private EventProcessor processor = null;

    @Activate
    protected void activate(ComponentContext context) {
        this.statsCollectionEnabled = PropertiesUtil.toBoolean(context.getProperties().get("stats.enabled"), (boolean)true);
        if (this.statsCollectionEnabled) {
            this.processor = new EventProcessor();
            Thread thread = new Thread((Runnable)this.processor, this.getClass().getSimpleName() + "-Processor");
            thread.setDaemon(true);
            thread.start();
        }
        log.debug("Stats activate.  Enabled is: {} processor is: {}", (Object)this.statsCollectionEnabled, (Object)this.processor);
    }

    @Deactivate
    protected void deactivate(ComponentContext context) {
        log.debug("Stats deactivate.  Enabled is: {} processor is: {}", (Object)this.statsCollectionEnabled, (Object)this.processor);
        if (this.processor != null) {
            this.processor.stop();
            Hashtable<String, Boolean> props = new Hashtable<String, Boolean>();
            props.put("WORKFLOW_STATISTICS_SERVICE_STOP", true);
            this.queue.offer(new WorkflowEvent(props));
        }
    }

    public Map getReport() {
        throw new UnsupportedOperationException("This method is no longer supported.  Please use getReport(Session)");
    }

    public Map getReport(Session session) {
        HashMap<String, Object> workflowStats = new HashMap<String, Object>();
        try {
            Iterator itr = this.statisticsService.runReport(session, (Report)new WorkflowReport("/var/statistics/workflows"));
            Object[] stats = (Object[])itr.next();
            workflowStats.put("Workflow Model Id", stats[0]);
            workflowStats.put("Total Count", stats[1]);
            workflowStats.put("Total Time", stats[2]);
            workflowStats.put("Average Throughput Time", stats[3]);
            workflowStats.put("Minimal Throughput Time", stats[4]);
            workflowStats.put("Maximal Throughput Time", stats[5]);
            workflowStats.put("Number of Running Instances", stats[6]);
            workflowStats.put("Number of Aborted Instances", stats[7]);
            workflowStats.put("Number of Completed Instances", stats[8]);
            workflowStats.put("Number of Suspended Instances", stats[9]);
        }
        catch (RepositoryException re) {
            log.warn("Unable to fetch workflow report: " + (Object)re);
        }
        return workflowStats;
    }

    public Map getInstanceReport(WorkflowModel model) {
        throw new UnsupportedOperationException("This method is no longer supported.  Please use getInstanceReport(Session, WorkflowModel");
    }

    public Map getInstanceReport(Session session, WorkflowModel model) {
        Map workflowStats = new HashMap();
        try {
            Iterator itr = this.statisticsService.runReport(session, (Report)new WorkflowInstanceReport("/var/statistics/workflows", model));
            workflowStats = (Map)itr.next();
        }
        catch (RepositoryException re) {
            log.warn("Unable to fetch workflow instance report for " + model.getId() + ": " + (Object)re);
        }
        return workflowStats;
    }

    public void handleEvent(Event event) {
        if (this.statsCollectionEnabled && "com/day/cq/workflow/event".equals(event.getTopic())) {
            try {
                this.queue.put((WorkflowEvent)event);
                if (log.isDebugEnabled()) {
                    log.info("handleEvent: put event [{}] in queue, size now [{}]", event.getProperty("EventType"), (Object)this.queue.size());
                }
            }
            catch (InterruptedException e) {
                log.debug("Queue put interrupted");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processEvent(WorkflowEvent event) {
        block16 : {
            WorkflowSession wfSession = this.getWorkflowSession();
            try {
                Object eventType = event.getProperty("EventType");
                String instanceId = (String)event.getProperty("WorkflowInstanceId");
                Workflow instance = null;
                try {
                    instance = instanceId != null ? wfSession.getWorkflow(instanceId) : null;
                }
                catch (WorkflowException we) {
                    log.warn("Unable to load workflow instance {}", (Object)instanceId, (Object)we);
                }
                if (instance == null) break block16;
                try {
                    WorkflowView view;
                    if (eventType.equals("WorkflowStarted") || eventType.equals("WorkflowResumed") || eventType.equals("WorkflowSuspended")) {
                        view = new WorkflowView("/var/statistics/workflows", instance, eventType);
                        this.statisticsService.addEntry((Entry)view);
                    } else if (eventType.equals("WorkflowAborted") || eventType.equals("WorkflowCompleted")) {
                        view = new WorkflowView("/var/statistics/workflows", instance, eventType);
                        this.statisticsService.addEntry((Entry)view);
                        try {
                            List historyItems = wfSession.getHistory(instance);
                            WorkflowInstanceView iView = new WorkflowInstanceView("/var/statistics/workflows", instance.getWorkflowModel(), historyItems, null);
                            this.statisticsService.addEntry((Entry)iView);
                        }
                        catch (Throwable t) {
                            log.warn("Cannot create view", (Object)t.getMessage());
                        }
                    }
                    if (eventType.equals("NodeTransition")) {
                        WorkItem currentItem = (WorkItem)event.getProperty("Workitem");
                        try {
                            WorkflowInstanceView view2 = new WorkflowInstanceView("/var/statistics/workflows", instance.getWorkflowModel(), wfSession.getHistory(instance), currentItem);
                            this.statisticsService.addEntry((Entry)view2);
                        }
                        catch (Throwable t) {
                            log.warn("Cannot create view", (Object)t.getMessage());
                        }
                    }
                }
                catch (RepositoryException re) {
                    log.warn("Unable to update statistics: " + re.getMessage());
                }
            }
            finally {
                if (wfSession != null) {
                    wfSession.logout();
                }
            }
        }
    }

    private WorkflowSession getWorkflowSession() {
        Session session = ServiceLoginUtil.createWorkflowServiceSession(this.repository);
        return this.workflowService.getWorkflowSession(session);
    }

    protected void bindWorkflowService(WorkflowService workflowService) {
        this.workflowService = workflowService;
    }

    protected void unbindWorkflowService(WorkflowService workflowService) {
        if (this.workflowService == workflowService) {
            this.workflowService = null;
        }
    }

    protected void bindStatisticsService(StatisticsService statisticsService) {
        this.statisticsService = statisticsService;
    }

    protected void unbindStatisticsService(StatisticsService statisticsService) {
        if (this.statisticsService == statisticsService) {
            this.statisticsService = null;
        }
    }

    protected void bindRepository(SlingRepository slingRepository) {
        this.repository = slingRepository;
    }

    protected void unbindRepository(SlingRepository slingRepository) {
        if (this.repository == slingRepository) {
            this.repository = null;
        }
    }

    private class EventProcessor
    implements Runnable {
        private volatile boolean isRunning;

        private EventProcessor() {
            this.isRunning = true;
        }

        public void stop() {
            this.isRunning = false;
        }

        @Override
        public void run() {
            log.debug("Workflow Statistics Processor started");
            while (this.isRunning) {
                try {
                    WorkflowEvent event = (WorkflowEvent)CQWorkflowStatisticsService.this.queue.take();
                    log.debug("Workflow Statistics processor. Processing {} type: {} ", (Object)event.getTopic(), event.getProperty("EventType"));
                    CQWorkflowStatisticsService.this.processEvent(event);
                }
                catch (InterruptedException e) {
                    log.debug("Workflow Statistics processor interrupted");
                }
            }
            log.debug("Workflow Statistics Processor done");
        }
    }

}