AgentReverseReplicationHandler.java 13.1 KB
/*
 * Decompiled with CFR 0_118.
 * 
 * Could not load the following classes:
 *  com.day.durbo.DurboInput
 *  javax.jcr.Item
 *  javax.jcr.Node
 *  javax.jcr.Property
 *  javax.jcr.RepositoryException
 *  javax.jcr.Session
 *  org.apache.commons.io.IOUtils
 *  org.apache.felix.scr.annotations.Activate
 *  org.apache.felix.scr.annotations.Component
 *  org.apache.felix.scr.annotations.Property
 *  org.apache.felix.scr.annotations.Reference
 *  org.apache.felix.scr.annotations.Service
 *  org.apache.jackrabbit.util.ISO8601
 *  org.slf4j.Logger
 *  org.slf4j.LoggerFactory
 */
package com.day.cq.replication.impl;

import com.day.cq.replication.Agent;
import com.day.cq.replication.ReplicationAction;
import com.day.cq.replication.ReplicationActionType;
import com.day.cq.replication.ReplicationContent;
import com.day.cq.replication.ReplicationException;
import com.day.cq.replication.ReplicationLog;
import com.day.cq.replication.Replicator;
import com.day.cq.replication.ReverseReplication;
import com.day.cq.replication.impl.RevRepEntry;
import com.day.cq.replication.impl.ReverseReplicationHandler;
import com.day.cq.replication.impl.content.durbo.DurboImportConfiguration;
import com.day.cq.replication.impl.content.durbo.DurboImportConfigurationProvider;
import com.day.cq.replication.impl.content.durbo.DurboImportResult;
import com.day.cq.replication.impl.content.durbo.DurboImporter;
import com.day.durbo.DurboInput;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.jcr.Item;
import javax.jcr.Node;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import org.apache.commons.io.IOUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.jackrabbit.util.ISO8601;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component
@Service(value={ReverseReplicationHandler.class})
public class AgentReverseReplicationHandler
implements ReverseReplicationHandler,
DurboImporter.Hook {
    private final Logger logger;
    @Property(label="Root paths", description="Absolute root paths for automatic delete propagation", value={"/content", "/etc", "/home", "/var/dam"})
    public static final String ROOT_PATHS = "root-paths";
    private String[] rootPaths;
    @Reference
    private Replicator replicator;
    @Reference
    private DurboImportConfigurationProvider durboImportConfigurationProvider;

    public AgentReverseReplicationHandler() {
        this.logger = LoggerFactory.getLogger(this.getClass());
    }

    @Activate
    protected void activate(Map<String, Object> config) throws Exception {
        this.rootPaths = (String[])config.get("root-paths");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void poll(Agent agent, Session agentSession, ReplicationAction action) throws ReplicationException {
        long lastModified;
        ReverseReplication[] repsArray = agent.poll(action);
        if (repsArray == null || repsArray.length == 0) {
            return;
        }
        ReplicationLog log = agent.getLog();
        ArrayList<RevRepEntry> reps = new ArrayList<RevRepEntry>(repsArray.length);
        for (ReverseReplication rep32 : repsArray) {
            reps.add(new RevRepEntry(rep32));
        }
        try {
            int numUnprocessed = reps.size();
            int lastSize = 0;
            int numRetries = 1;
            while (numUnprocessed > 0 && numRetries > 0) {
                numUnprocessed = this.importAll(reps, agentSession, log);
                if (numUnprocessed > 0) {
                    numRetries = lastSize == numUnprocessed ? --numRetries : 1;
                    if (numRetries > 0) {
                        log.info("Some of the polled replication data could not be imported. retrying with remaining " + numUnprocessed);
                    } else {
                        log.info("Some of the polled replication data could not be imported. Aborting.");
                    }
                }
                lastSize = numUnprocessed;
            }
            lastModified = 0;
        }
        catch (Throwable var11_18) {
            long lastModified2 = 0;
            for (RevRepEntry rep22 : reps) {
                if (!rep22.success) {
                    if (lastModified2 != rep22.lastModified) break;
                    --lastModified2;
                    break;
                }
                if (rep22.lastModified <= lastModified2) continue;
                lastModified2 = rep22.lastModified;
            }
            if (lastModified2 > 0) {
                Calendar lastPoll = Calendar.getInstance();
                lastPoll.setTimeInMillis(lastModified2 + 1);
                log.info("Setting last poll time to %s", ISO8601.format((Calendar)lastPoll));
                agent.setNextPollTimeline(lastPoll);
            } else {
                log.info("Not updating last poll time due to errors.");
            }
            for (RevRepEntry rep : reps) {
                if (rep.success) continue;
                rep.replication.getContent().destroy();
                log.error("Errors during importing reverse replicated content of %s", rep.replication.getAction().getPath());
            }
            throw var11_18;
        }
        for (RevRepEntry rep : reps) {
            if (!rep.success) {
                if (lastModified != rep.lastModified) break;
                --lastModified;
                break;
            }
            if (rep.lastModified <= lastModified) continue;
            lastModified = rep.lastModified;
        }
        if (lastModified > 0) {
            Calendar lastPoll = Calendar.getInstance();
            lastPoll.setTimeInMillis(lastModified + 1);
            log.info("Setting last poll time to %s", ISO8601.format((Calendar)lastPoll));
            agent.setNextPollTimeline(lastPoll);
        } else {
            log.info("Not updating last poll time due to errors.");
        }
        for (RevRepEntry rep2 : reps) {
            if (rep2.success) continue;
            rep2.replication.getContent().destroy();
            log.error("Errors during importing reverse replicated content of %s", rep2.replication.getAction().getPath());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Lifted jumps to return sites
     */
    int importAll(List<RevRepEntry> reps, Session agentSession, ReplicationLog log) {
        numErrors = 0;
        var5_5 = reps.iterator();
        block13 : do lbl-1000: // 10 sources:
        {
            if (!var5_5.hasNext()) {
                Collections.sort(reps);
                return numErrors;
            }
            rep = var5_5.next();
            if (rep.success) continue;
            r = rep.replication;
            outboxPath = r.getProperties().get("outbox.item.path");
            if (outboxPath == null || outboxPath.length() == 0) {
                outboxPath = "N/A";
            }
            c = r.getContent();
            rep.lastModified = c.getLastModified();
            try {
                switch (.$SwitchMap$com$day$cq$replication$ReplicationActionType[r.getAction().getType().ordinal()]) {
                    case 1: {
                        try {
                            this.importContent(c, agentSession);
                            rep.success = true;
                            log.info("Successfully imported content at %s", new Object[]{r.getAction().getPath()});
                        }
                        catch (ReplicationException e) {
                            log.info("Cannot import content %s due to %s. Remote outbox path: %s", new Object[]{r.getAction().getPath(), e, outboxPath});
                            try {
                                agentSession.refresh(false);
                                ** break;
                            }
                            catch (RepositoryException var11_13) {}
                        }
                        continue block13;
                    }
                    case 2: {
                        try {
                            this.deleteContent(r.getAction().getPath(), agentSession);
                            rep.success = true;
                        }
                        catch (ReplicationException e) {
                            log.info("Cannot delete %s: %s. Remote outbox path: %s", new Object[]{r.getAction().getPath(), e, outboxPath});
                        }
                        continue block13;
                    }
                }
                log.warn("Reverse replication not implemented for action: %s. Remote outbox path: %s", new Object[]{r.getAction().getType(), outboxPath});
                continue;
            }
            finally {
                if (rep.success) {
                    c.destroy();
                    continue;
                }
                ++numErrors;
                continue;
            }
            break;
        } while (true);
    }

    private void deleteContent(String path, Session agentSession) throws ReplicationException {
        this.logger.debug("Starting content deletion for durbo request on path: {}", (Object)path);
        try {
            if (agentSession.itemExists(path)) {
                agentSession.getItem(path).remove();
                this.logger.debug("Saving...");
                agentSession.save();
                this.logger.info("Content {} deleted.", (Object)path);
                if (this.liesUnderneath(path)) {
                    this.logger.info("Replicating deletion of {}.");
                    this.replicator.replicate(agentSession, ReplicationActionType.DELETE, path);
                }
            } else {
                this.logger.debug("Content {} does not exist.", (Object)path);
            }
        }
        catch (RepositoryException e) {
            String msg = String.format("Repository error during content deletion: %s", e.getMessage());
            throw new ReplicationException(msg, (Exception)e);
        }
    }

    private void importContent(ReplicationContent content, Session agentSession) throws ReplicationException {
        InputStream in = null;
        try {
            in = content.getInputStream();
            if (in != null) {
                DurboImporter importer = new DurboImporter(this.durboImportConfigurationProvider.getConfiguration());
                importer.setHook(this);
                importer.createNode(agentSession, null, new DurboInput(in), false);
            }
        }
        catch (IOException e) {
            String msg = "Unable to open input stream on replication content.";
            throw new ReplicationException(msg, e);
        }
        finally {
            IOUtils.closeQuietly((InputStream)in);
        }
    }

    @Override
    public String transform(Session session, String replicationPath) {
        return replicationPath;
    }

    @Override
    public void beforeSave(Node node) throws RepositoryException {
        if (node == null) {
            return;
        }
        Calendar date = Calendar.getInstance();
        String userId = node.getSession().getUserID();
        ReplicationActionType type = ReplicationActionType.ACTIVATE;
        if (node.hasNode("{http://www.jcp.org/jcr/1.0}content")) {
            node = node.getNode("{http://www.jcp.org/jcr/1.0}content");
        }
        try {
            if (!node.isNodeType("cq:ReplicationStatus") && node.canAddMixin("cq:ReplicationStatus")) {
                node.addMixin("cq:ReplicationStatus");
            }
            node.setProperty("cq:lastReplicationAction", type.getName());
            node.setProperty("cq:lastReplicatedBy", userId);
            node.setProperty("cq:lastReplicated", date);
        }
        catch (RepositoryException e) {
            this.logger.warn("unable to update replication status.", (Throwable)e);
        }
    }

    private boolean liesUnderneath(String path) {
        for (String rootPath : this.rootPaths) {
            if (!path.startsWith(rootPath)) continue;
            return true;
        }
        return false;
    }

    protected void bindReplicator(Replicator replicator) {
        this.replicator = replicator;
    }

    protected void unbindReplicator(Replicator replicator) {
        if (this.replicator == replicator) {
            this.replicator = null;
        }
    }

    protected void bindDurboImportConfigurationProvider(DurboImportConfigurationProvider durboImportConfigurationProvider) {
        this.durboImportConfigurationProvider = durboImportConfigurationProvider;
    }

    protected void unbindDurboImportConfigurationProvider(DurboImportConfigurationProvider durboImportConfigurationProvider) {
        if (this.durboImportConfigurationProvider == durboImportConfigurationProvider) {
            this.durboImportConfigurationProvider = null;
        }
    }

}