ReplicationReceiverImpl.java 13.7 KB
/*
 * Decompiled with CFR 0_118.
 * 
 * Could not load the following classes:
 *  com.day.jcr.vault.fs.api.ProgressTrackerListener
 *  com.day.jcr.vault.fs.io.ImportOptions
 *  com.day.jcr.vault.packaging.JcrPackage
 *  com.day.jcr.vault.packaging.JcrPackageManager
 *  com.day.jcr.vault.packaging.Packaging
 *  com.day.jcr.vault.util.DefaultProgressListener
 *  javax.jcr.Item
 *  javax.jcr.Node
 *  javax.jcr.NodeIterator
 *  javax.jcr.RepositoryException
 *  javax.jcr.Session
 *  javax.jcr.nodetype.NodeDefinition
 *  org.apache.felix.scr.annotations.Activate
 *  org.apache.felix.scr.annotations.Component
 *  org.apache.felix.scr.annotations.Modified
 *  org.apache.felix.scr.annotations.Property
 *  org.apache.felix.scr.annotations.Reference
 *  org.apache.felix.scr.annotations.Service
 *  org.apache.sling.commons.osgi.PropertiesUtil
 *  org.osgi.service.event.Event
 *  org.osgi.service.event.EventAdmin
 *  org.slf4j.Logger
 *  org.slf4j.LoggerFactory
 */
package com.day.cq.replication.impl;

import com.day.cq.replication.Agent;
import com.day.cq.replication.ReplicationAction;
import com.day.cq.replication.ReplicationActionType;
import com.day.cq.replication.ReplicationEvent;
import com.day.cq.replication.ReplicationException;
import com.day.cq.replication.ReplicationPathTransformer;
import com.day.cq.replication.ReplicationReceiver;
import com.day.cq.replication.impl.ReplicationPathTransformerProvider;
import com.day.cq.replication.impl.content.durbo.DurboImportConfiguration;
import com.day.cq.replication.impl.content.durbo.DurboImportConfigurationProvider;
import com.day.cq.replication.impl.content.durbo.DurboImportResult;
import com.day.cq.replication.impl.content.durbo.DurboImporter;
import com.day.jcr.vault.fs.api.ProgressTrackerListener;
import com.day.jcr.vault.fs.io.ImportOptions;
import com.day.jcr.vault.packaging.JcrPackage;
import com.day.jcr.vault.packaging.JcrPackageManager;
import com.day.jcr.vault.packaging.Packaging;
import com.day.jcr.vault.util.DefaultProgressListener;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.Writer;
import java.util.List;
import java.util.Map;
import javax.jcr.Item;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.nodetype.NodeDefinition;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(metatype=1, immediate=1)
@Service(value={ReplicationReceiver.class})
public class ReplicationReceiverImpl
implements ReplicationReceiver {
    private static final Logger log = LoggerFactory.getLogger(ReplicationReceiverImpl.class);
    public static final long OSGI_PROP_TMPFILE_THRESHOLD_DEFAULT = 0x100000;
    @Property(longValue={0x100000})
    public static final String OSGI_PROP_TMPFILE_THRESHOLD = "receiver.tmpfile.threshold";
    @Property(boolValue={0})
    public static final String OSGI_PROP_INSTALL_PACKAGES = "receiver.packages.use.install";
    public static final int DEFAULT_SAVE_EVERY_HOW_MANY = 1000;
    @Reference
    private Packaging pkgSvc = null;
    @Reference
    private EventAdmin eventAdmin = null;
    @Reference
    private DurboImportConfigurationProvider durboImportConfigurationProvider = null;
    @Reference
    private ReplicationPathTransformerProvider transformerProvider = null;
    private long tmpFileThreshold;
    private boolean useInstall;

    @Activate
    protected void activate(Map<String, Object> props) {
        this.update(props);
    }

    @Modified
    protected void modified(Map<String, Object> props) {
        this.update(props);
    }

    private void update(Map<String, Object> props) {
        this.tmpFileThreshold = PropertiesUtil.toLong((Object)props.get("receiver.tmpfile.threshold"), (long)0x100000);
        this.useInstall = PropertiesUtil.toBoolean((Object)props.get("receiver.packages.use.install"), (boolean)false);
        if (log.isInfoEnabled()) {
            log.info("Receiver started. threshold set to {}. useInstall={}", (Object)this.tmpFileThreshold, (Object)this.useInstall);
        }
    }

    @Override
    public void receive(Session session, ReplicationAction action, InputStream in, long size, Writer writer) throws ReplicationException, IOException {
        this.receive(session, action, in, size, writer, true);
    }

    @Override
    public void receive(Session session, ReplicationAction action, InputStream in, long size, Writer writer, boolean install) throws IOException, ReplicationException {
        this.receive(session, action, in, size, writer, true, false);
    }

    @Override
    public void receive(Session session, ReplicationAction action, InputStream in, long size, Writer writer, boolean install, boolean binaryLess) throws ReplicationException, IOException {
        Node receivedNode = null;
        ReceiveListener listener = new ReceiveListener(session, action, writer, install);
        final ReplicationPathTransformer transformer = this.transformerProvider.getTransformer(session, action, null);
        listener.setReplicationPathTransformer(transformer);
        if (action.getType() == ReplicationActionType.ACTIVATE) {
            DurboImporter durboImporter = new DurboImporter(this.durboImportConfigurationProvider.getConfiguration());
            final ReplicationAction finalAction = action;
            durboImporter.setTempFileThreshold(this.tmpFileThreshold);
            if (transformer != null) {
                durboImporter.setHook(new DurboImporter.Hook(){

                    @Override
                    public String transform(Session session, String replicationPath) {
                        return transformer.transform(session, replicationPath, finalAction, null);
                    }

                    @Override
                    public void beforeSave(Node content) throws RepositoryException {
                    }
                });
            }
            DurboImportResult importResult = durboImporter.createNode(session, action.getPath(), in, size, binaryLess);
            receivedNode = importResult.getCreatedNode();
            List<String> failedPaths = importResult.getFailedPaths();
            if (failedPaths != null && failedPaths.size() > 0) {
                this.writeFailedPaths(failedPaths, writer);
            }
        }
        listener.onReceived(receivedNode);
        Event replicationEvent = new ReplicationEvent(action).toNonDistributableEvent();
        this.eventAdmin.postEvent(replicationEvent);
    }

    private void writeFailedPaths(List<String> failedPaths, Writer writer) throws IOException {
        writer.write("FAILED PATHS START\n");
        for (String failingPath : failedPaths) {
            writer.write(failingPath);
            writer.write("\n");
        }
        writer.write("FAILED PATHS END\n");
    }

    private int removeRecursive(Node n) throws RepositoryException {
        String path = n.getPath();
        int result = this.remove(n, n.getPath(), 0);
        log.info("removeRecursive({}) done: {} nodes deleted, saving...", (Object)path, (Object)result);
        n.getSession().save();
        return result;
    }

    private int remove(Node n, String startPath, int deletedCount) throws RepositoryException {
        NodeIterator ni = n.getNodes();
        while (ni.hasNext()) {
            deletedCount = this.remove(ni.nextNode(), startPath, deletedCount);
        }
        NodeDefinition def = n.getDefinition();
        if (!def.isProtected() && !def.isMandatory()) {
            n.remove();
            ++deletedCount;
        }
        if (deletedCount % 1000 == 0) {
            log.info("removeRecursive({}) in progress: {} nodes deleted, saving...", (Object)startPath, (Object)deletedCount);
            n.getSession().save();
        }
        return deletedCount;
    }

    protected void bindPkgSvc(Packaging packaging) {
        this.pkgSvc = packaging;
    }

    protected void unbindPkgSvc(Packaging packaging) {
        if (this.pkgSvc == packaging) {
            this.pkgSvc = null;
        }
    }

    protected void bindEventAdmin(EventAdmin eventAdmin) {
        this.eventAdmin = eventAdmin;
    }

    protected void unbindEventAdmin(EventAdmin eventAdmin) {
        if (this.eventAdmin == eventAdmin) {
            this.eventAdmin = null;
        }
    }

    protected void bindDurboImportConfigurationProvider(DurboImportConfigurationProvider durboImportConfigurationProvider) {
        this.durboImportConfigurationProvider = durboImportConfigurationProvider;
    }

    protected void unbindDurboImportConfigurationProvider(DurboImportConfigurationProvider durboImportConfigurationProvider) {
        if (this.durboImportConfigurationProvider == durboImportConfigurationProvider) {
            this.durboImportConfigurationProvider = null;
        }
    }

    protected void bindTransformerProvider(ReplicationPathTransformerProvider replicationPathTransformerProvider) {
        this.transformerProvider = replicationPathTransformerProvider;
    }

    protected void unbindTransformerProvider(ReplicationPathTransformerProvider replicationPathTransformerProvider) {
        if (this.transformerProvider == replicationPathTransformerProvider) {
            this.transformerProvider = null;
        }
    }

    private final class ReceiveListener {
        private final Session session;
        private final ReplicationAction action;
        private final Writer writer;
        private final boolean install;
        private ReplicationPathTransformer transformer;

        public void setReplicationPathTransformer(ReplicationPathTransformer transformer) {
            this.transformer = transformer;
        }

        private ReceiveListener(Session session, ReplicationAction action, Writer writer, boolean install) {
            this.session = session;
            this.action = action;
            this.writer = writer;
            this.install = install;
        }

        private void onReceived(Node node) throws ReplicationException, IOException {
            try {
                ReplicationActionType type = this.action.getType();
                switch (type) {
                    case ACTIVATE: {
                        this.onActivate(node);
                        break;
                    }
                    case DELETE: {
                        this.onDelete();
                        break;
                    }
                    case DEACTIVATE: {
                        this.onDeactivate();
                        break;
                    }
                    case TEST: {
                        this.onTest();
                        break;
                    }
                    default: {
                        throw new ReplicationException("Unknown ReplicationActionType: " + (Object)((Object)type));
                    }
                }
                if (this.writer != null) {
                    this.writer.write("ReplicationAction " + type.toString() + " ok.");
                }
            }
            catch (ReplicationException e) {
                log.error("Unable to receive replication.", (Throwable)e);
                throw e;
            }
        }

        private void onActivate(Node node) throws ReplicationException {
            String path = null;
            try {
                path = node.getPath();
            }
            catch (Exception e) {
                log.error("Error while retrieving path of node.", (Throwable)e);
            }
            try {
                if (this.install && path != null && path.startsWith("/etc/packages/") && node.isNodeType("{http://www.jcp.org/jcr/nt/1.0}file")) {
                    this.writer.write("Content package received at " + path + ". Starting import.\n");
                    JcrPackageManager packMgr = ReplicationReceiverImpl.this.pkgSvc.getPackageManager(node.getSession());
                    JcrPackage pack = packMgr.open(node);
                    ImportOptions opts = new ImportOptions();
                    opts.setListener((ProgressTrackerListener)new DefaultProgressListener(new PrintWriter(this.writer)));
                    if (ReplicationReceiverImpl.this.useInstall) {
                        pack.install(opts);
                    } else {
                        pack.extract(opts);
                    }
                }
            }
            catch (Exception e) {
                log.error("Error while unpacking package at " + path, (Throwable)e);
            }
        }

        private void onDelete() throws ReplicationException {
            try {
                String path = this.action.getPath();
                if (this.transformer != null) {
                    path = this.transformer.transform(this.session, path, this.action, null);
                }
                this.remove(path);
            }
            catch (RepositoryException e) {
                throw new ReplicationException((Exception)e);
            }
        }

        private void onDeactivate() throws ReplicationException {
            this.onDelete();
        }

        private void remove(String path) throws RepositoryException {
            if (this.session.itemExists(path)) {
                Item item = this.session.getItem(path);
                if (item instanceof Node) {
                    ReplicationReceiverImpl.this.removeRecursive((Node)item);
                } else {
                    this.session.getItem(path).remove();
                    this.session.save();
                }
            }
        }

        private void onTest() {
        }
    }

}