AbstractImporter.java 14.2 KB
/*
 * Decompiled with CFR 0_118.
 * 
 * Could not load the following classes:
 *  aQute.bnd.annotation.ConsumerType
 *  com.adobe.granite.workflow.WorkflowException
 *  com.adobe.granite.workflow.launcher.ConfigEntry
 *  com.adobe.granite.workflow.launcher.WorkflowLauncher
 *  com.day.cq.commons.jcr.JcrObservationThrottle
 *  com.day.cq.commons.jcr.JcrUtil
 *  com.day.cq.tagging.InvalidTagFormatException
 *  com.day.cq.tagging.Tag
 *  com.day.cq.tagging.TagManager
 *  javax.jcr.Node
 *  javax.jcr.NodeIterator
 *  javax.jcr.Property
 *  javax.jcr.RepositoryException
 *  javax.jcr.Session
 *  org.apache.commons.lang.StringUtils
 *  org.apache.felix.scr.annotations.Activate
 *  org.apache.felix.scr.annotations.Component
 *  org.apache.felix.scr.annotations.Property
 *  org.apache.felix.scr.annotations.Reference
 *  org.apache.felix.scr.annotations.Service
 *  org.apache.jackrabbit.commons.JcrUtils
 *  org.apache.sling.api.SlingHttpServletResponse
 *  org.apache.sling.api.resource.Resource
 *  org.apache.sling.api.resource.ResourceResolver
 *  org.apache.sling.commons.osgi.PropertiesUtil
 *  org.osgi.service.component.ComponentContext
 *  org.osgi.service.event.Event
 *  org.osgi.service.event.EventAdmin
 *  org.slf4j.Logger
 *  org.slf4j.LoggerFactory
 */
package com.adobe.cq.commerce.pim.common;

import aQute.bnd.annotation.ConsumerType;
import com.adobe.granite.workflow.WorkflowException;
import com.adobe.granite.workflow.launcher.ConfigEntry;
import com.adobe.granite.workflow.launcher.WorkflowLauncher;
import com.day.cq.commons.jcr.JcrObservationThrottle;
import com.day.cq.commons.jcr.JcrUtil;
import com.day.cq.tagging.InvalidTagFormatException;
import com.day.cq.tagging.Tag;
import com.day.cq.tagging.TagManager;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import org.apache.commons.lang.StringUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.jackrabbit.commons.JcrUtils;
import org.apache.sling.api.SlingHttpServletResponse;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(componentAbstract=1, metatype=1)
@Service
@ConsumerType
public abstract class AbstractImporter {
    private static final Logger log = LoggerFactory.getLogger(AbstractImporter.class);
    @Reference
    EventAdmin eventAdmin = null;
    private int SAVE_BATCH_SIZE;
    private static final int DEFAULT_SAVE_BATCH_SIZE = 1000;
    @Property(label="Save Batch Size", description="Approximate number of nodes to batch between session saves", intValue={1000})
    public static final String SAVE_BATCH_SIZE_PROP_NAME = "cq.commerce.importer.savebatchsize";
    private int THROTTLE_BATCH_SIZE;
    private static final int DEFAULT_THROTTLE_BATCH_SIZE = 50000;
    @Property(label="Throttle Batch Size", description="Approximate number of nodes between pauses for observation manager", intValue={50000})
    public static final String THROTTLE_BATCH_SIZE_PROP_NAME = "cq.commerce.importer.throttlebatchsize";
    private int EVENT_BATCH_SIZE = 1000;
    private int MESSAGE_CAP;
    private static final int DEFAULT_MESSAGE_CAP = 1000;
    @Property(label="Message Cap", description="Maximum number of messages to return in response", intValue={1000})
    public static final String MESSAGE_CAP_PROP_NAME = "cq.commerce.importer.messagecap";
    private int saveBatchCount = 0;
    private int throttleBatchCount = 0;
    private JcrObservationThrottle throttle = null;
    private Set<String> disabledWorkflows = new HashSet<String>();
    private Map<String, Set<String>> eventQueues = new HashMap<String, Set<String>>();
    private List<String> messages;
    private int errorCount;
    private String tickerToken = null;
    private String tickerMessage;
    private boolean tickerComplete;

    @Activate
    protected void activate(ComponentContext ctx) throws Exception {
        this.SAVE_BATCH_SIZE = PropertiesUtil.toInteger(ctx.getProperties().get("cq.commerce.importer.savebatchsize"), (int)1000);
        this.THROTTLE_BATCH_SIZE = PropertiesUtil.toInteger(ctx.getProperties().get("cq.commerce.importer.throttlebatchsize"), (int)50000);
        this.MESSAGE_CAP = PropertiesUtil.toInteger(ctx.getProperties().get("cq.commerce.importer.messagecap"), (int)1000);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void run(ResourceResolver resourceResolver, String basePath, String storeName, boolean incrementalImport, String provider) {
        this.messages = new ArrayList<String>();
        this.errorCount = 0;
        try {
            Node rootNode = this.setupStore(resourceResolver, basePath, storeName, !incrementalImport, provider);
            this.disableWorkflows(resourceResolver);
            this.openThrottle(rootNode);
            this.doImport(resourceResolver, rootNode, incrementalImport);
        }
        catch (Exception e) {
            log.error("Error while running import", (Throwable)e);
        }
        finally {
            this.tickerComplete = true;
            this.checkpoint((Session)resourceResolver.adaptTo(Session.class), true);
            this.closeThrottle();
            this.reenableWorkflows(resourceResolver);
        }
    }

    protected Node setupStore(ResourceResolver resourceResolver, String basePath, String storeName, boolean clear, String provider) {
        Session session = (Session)resourceResolver.adaptTo(Session.class);
        String storePath = basePath;
        if (StringUtils.isNotEmpty((String)storeName)) {
            storePath = storePath + "/" + AbstractImporter.mangleName(storeName);
        }
        Resource rootResource = resourceResolver.getResource(storePath);
        Node rootNode = null;
        try {
            if (rootResource != null) {
                rootNode = (Node)rootResource.adaptTo(Node.class);
                if (clear && rootNode.hasNodes()) {
                    NodeIterator it = rootNode.getNodes();
                    while (it.hasNext()) {
                        it.nextNode().remove();
                    }
                }
            } else {
                rootNode = JcrUtil.createPath((String)storePath, (boolean)false, (String)"sling:Folder", (String)"sling:Folder", (Session)session, (boolean)false);
                if (StringUtils.isNotEmpty((String)storeName)) {
                    rootNode.setProperty("jcr:title", storeName);
                }
            }
            rootNode.setProperty("cq:commerceProvider", provider);
            session.save();
        }
        catch (Exception e) {
            log.error("Failed to initialize store: ", (Throwable)e);
        }
        return rootNode;
    }

    protected abstract void doImport(ResourceResolver var1, Node var2, boolean var3) throws RepositoryException, IOException;

    protected void openThrottle(Node storeRoot) throws RepositoryException {
        this.throttle = new JcrObservationThrottle(JcrUtil.createUniqueNode((Node)storeRoot, (String)"temp", (String)"nt:unstructured", (Session)storeRoot.getSession()));
        this.throttle.open();
    }

    protected void closeThrottle() {
        if (this.throttle != null) {
            this.throttle.close();
        }
    }

    protected void checkpoint(Session session, boolean flush) {
        ++this.saveBatchCount;
        ++this.throttleBatchCount;
        if (this.saveBatchCount > this.SAVE_BATCH_SIZE || flush) {
            if (StringUtils.isNotEmpty((String)this.tickerToken)) {
                try {
                    Node node = JcrUtils.getOrCreateByPath((String)("/tmp/commerce/tickers/import_" + this.tickerToken), (String)"nt:unstructured", (Session)session);
                    node.setProperty("message", this.tickerMessage);
                    node.setProperty("errorCount", (long)this.errorCount);
                    node.setProperty("complete", this.tickerComplete);
                }
                catch (Exception e) {
                    log.error("ERROR updating ticker", (Throwable)e);
                }
            }
            try {
                session.save();
                this.saveBatchCount = 0;
            }
            catch (Exception e) {
                this.logMessage("ERROR saving session", false);
                this.errorCount += this.saveBatchCount;
                log.error("ERROR saving session", (Throwable)e);
            }
            if (this.throttleBatchCount > this.THROTTLE_BATCH_SIZE) {
                try {
                    long wait = this.throttle.waitForEvents();
                    this.throttleBatchCount = 0;
                }
                catch (RepositoryException e) {
                    // empty catch block
                }
            }
        }
        for (String eventName : this.eventQueues.keySet()) {
            Set<String> paths = this.eventQueues.get(eventName);
            if (paths.size() <= this.EVENT_BATCH_SIZE && (!flush || paths.size() <= 0)) continue;
            if (this.eventAdmin != null) {
                Hashtable<String, String[]> eventProperties = new Hashtable<String, String[]>();
                eventProperties.put("paths", paths.toArray(new String[paths.size()]));
                this.eventAdmin.postEvent(new Event(eventName, eventProperties));
            }
            paths.clear();
        }
    }

    protected boolean disableWorkflowPredicate(ConfigEntry workflowConfigEntry) {
        return false;
    }

    protected void disableWorkflows(ResourceResolver resourceResolver) {
        try {
            WorkflowLauncher launcher = (WorkflowLauncher)resourceResolver.adaptTo(WorkflowLauncher.class);
            Iterator entries = launcher.getConfigEntries();
            while (entries.hasNext()) {
                ConfigEntry entry = (ConfigEntry)entries.next();
                if (!entry.isEnabled() || !this.disableWorkflowPredicate(entry)) continue;
                entry.setEnabled(false);
                launcher.editConfigEntry(entry.getId(), entry);
                this.disabledWorkflows.add(entry.getId());
            }
        }
        catch (WorkflowException e) {
            log.error("Error while disabling workflows", (Throwable)e);
        }
    }

    protected void reenableWorkflows(ResourceResolver resourceResolver) {
        try {
            WorkflowLauncher launcher = (WorkflowLauncher)resourceResolver.adaptTo(WorkflowLauncher.class);
            Iterator entries = launcher.getConfigEntries();
            while (entries.hasNext()) {
                ConfigEntry entry = (ConfigEntry)entries.next();
                if (!this.disabledWorkflows.contains(entry.getId())) continue;
                entry.setEnabled(true);
                launcher.editConfigEntry(entry.getId(), entry);
            }
        }
        catch (WorkflowException e) {
            log.error("Error while re-enabling workflows", (Throwable)e);
        }
    }

    protected static String mangleName(String name) {
        return StringUtils.isEmpty((String)name) ? "" : JcrUtil.createValidName((String)name.trim().replace(" ", "-"));
    }

    protected void createMissingTags(ResourceResolver resourceResolver, String[] tags) {
        TagManager tm = (TagManager)resourceResolver.adaptTo(TagManager.class);
        for (String tag : tags) {
            try {
                if (!tm.canCreateTag(tag)) continue;
                tm.createTag(tag, null, null, false);
                continue;
            }
            catch (InvalidTagFormatException e) {
                log.error("Invalid tag ID", (Throwable)e);
            }
        }
    }

    protected void logEvent(String eventName, String path) {
        Set<String> paths = this.eventQueues.get(eventName);
        if (paths == null) {
            paths = new HashSet<String>();
            this.eventQueues.put(eventName, paths);
        }
        paths.add(path);
    }

    protected void updateLoggedEvents(String oldPath, String newPath) {
        for (String eventName : this.eventQueues.keySet()) {
            Set<String> paths = this.eventQueues.get(eventName);
            if (!paths.remove(oldPath)) continue;
            paths.add(newPath);
        }
    }

    protected void logMessage(String message, boolean isError) {
        if (this.messages.size() < this.MESSAGE_CAP) {
            this.messages.add(message);
        }
        if (isError) {
            ++this.errorCount;
        }
    }

    protected int getErrorCount() {
        return this.errorCount;
    }

    protected void initTicker(String tickerToken, Session session) {
        this.tickerToken = tickerToken;
        this.tickerMessage = "";
        this.tickerComplete = false;
    }

    protected void updateTicker(String tickerMessage) {
        this.tickerMessage = tickerMessage;
    }

    protected void respondWithMessages(SlingHttpServletResponse response, String summary) throws IOException {
        response.setContentType("text/html");
        response.setCharacterEncoding("UTF-8");
        PrintWriter pw = response.getWriter();
        pw.println("<html><body>");
        pw.println("<pre>");
        pw.println(summary);
        if (this.MESSAGE_CAP > 0) {
            pw.println("");
            for (String msg : this.messages) {
                pw.println(msg);
            }
            if (this.messages.size() == this.MESSAGE_CAP) {
                pw.println("...");
            }
        }
        pw.println("</pre>");
        pw.println("</body></html>");
        pw.flush();
    }

    protected void bindEventAdmin(EventAdmin eventAdmin) {
        this.eventAdmin = eventAdmin;
    }

    protected void unbindEventAdmin(EventAdmin eventAdmin) {
        if (this.eventAdmin == eventAdmin) {
            this.eventAdmin = null;
        }
    }
}