package weka.knowledgeflow.steps;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import weka.core.Attribute;
import weka.core.DenseInstance;
import weka.core.Instance;
import weka.core.Instances;
import weka.core.Range;
import weka.core.SerializedObject;
import weka.core.WekaException;
import weka.knowledgeflow.Data;
import weka.knowledgeflow.StepManager;
import weka.knowledgeflow.steps.Sorter;

@KFStep(category = "Flow", iconPath = "weka/gui/knowledgeflow/icons/Join.gif", name = "Join", toolTipText = "Performs an inner join on two incoming datasets/instance streams (IMPORTANT: assumes that both datasets are sorted in ascending order of the key fields). If data is not sorted then usea Sorter step to sort both into ascending order of the key fields. Does not handle the case wherekeys are not unique in one or both inputs.")
/* loaded from: classes3.dex */
public class Join extends BaseStep {
    public static final String KEY_SPEC_SEPARATOR = "@@KS@@";
    private static final long serialVersionUID = -8248954818247532014L;
    protected transient AtomicInteger m_count;
    protected transient Queue<Sorter.InstanceHolder> m_firstBuffer;
    protected transient boolean m_firstFinished;
    protected StepManager m_firstInput;
    protected boolean m_firstIsWaiting;
    protected transient Instances m_headerOne;
    protected transient List<Instances> m_headerPool;
    protected transient Instances m_headerTwo;
    protected int[] m_keyIndexesOne;
    protected int[] m_keyIndexesTwo;
    protected transient Instances m_mergedHeader;
    protected boolean m_runningIncrementally;
    protected transient Queue<Sorter.InstanceHolder> m_secondBuffer;
    protected transient boolean m_secondFinished;
    protected StepManager m_secondInput;
    protected boolean m_secondIsWaiting;
    protected Data m_streamingData;
    protected Map<String, Integer> m_stringAttIndexesOne;
    protected Map<String, Integer> m_stringAttIndexesTwo;
    protected boolean m_stringAttsPresent;
    protected String m_firstInputConnectionType = "";
    protected String m_secondInputConnectionType = "";
    protected String m_keySpec = "";

    private static void copyStringAttVals(Sorter.InstanceHolder instanceHolder, Map<String, Integer> map) {
        for (String str : map.keySet()) {
            String stringValue = instanceHolder.m_instance.stringValue(instanceHolder.m_instance.dataset().attribute(str));
            if (instanceHolder.m_stringVals == null) {
                instanceHolder.m_stringVals = new HashMap();
            }
            instanceHolder.m_stringVals.put(str, stringValue);
        }
    }

    protected synchronized void addToFirstBuffer(Instance instance) {
        if (isStopRequested()) {
            return;
        }
        Sorter.InstanceHolder instanceHolder = new Sorter.InstanceHolder();
        instanceHolder.m_instance = instance;
        copyStringAttVals(instanceHolder, this.m_stringAttIndexesOne);
        this.m_firstBuffer.add(instanceHolder);
        if (this.m_firstBuffer.size() > 100 && !this.m_secondFinished) {
            try {
                this.m_firstIsWaiting = true;
                wait();
            } catch (InterruptedException unused) {
            }
        }
    }

    protected synchronized void addToSecondBuffer(Instance instance) {
        if (isStopRequested()) {
            return;
        }
        Sorter.InstanceHolder instanceHolder = new Sorter.InstanceHolder();
        instanceHolder.m_instance = instance;
        copyStringAttVals(instanceHolder, this.m_stringAttIndexesTwo);
        this.m_secondBuffer.add(instanceHolder);
        if (this.m_secondBuffer.size() > 100 && !this.m_firstFinished) {
            try {
                this.m_secondIsWaiting = true;
                wait();
            } catch (InterruptedException unused) {
            }
        }
    }

    protected synchronized void clearBuffers() throws WekaException {
        while (this.m_firstBuffer.size() > 0 && this.m_secondBuffer.size() > 0) {
            if (isStopRequested()) {
                return;
            }
            getStepManager().throughputUpdateStart();
            Instance processBuffers = processBuffers();
            getStepManager().throughputUpdateEnd();
            this.m_streamingData.setPayloadElement("instance", processBuffers);
            getStepManager().outputData(this.m_streamingData);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:54:0x0100, code lost:
    
        if (r10.isMissing(r9.m_keyIndexesOne[r1]) == false) goto L52;
     */
    /* JADX WARN: Code restructure failed: missing block: B:55:0x0102, code lost:
    
        return -1;
     */
    /* JADX WARN: Code restructure failed: missing block: B:56:0x0103, code lost:
    
        return 1;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected int compare(weka.core.Instance r10, weka.core.Instance r11, weka.knowledgeflow.steps.Sorter.InstanceHolder r12, weka.knowledgeflow.steps.Sorter.InstanceHolder r13) {
        /*
            Method dump skipped, instructions count: 261
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: weka.knowledgeflow.steps.Join.compare(weka.core.Instance, weka.core.Instance, weka.knowledgeflow.steps.Sorter$InstanceHolder, weka.knowledgeflow.steps.Sorter$InstanceHolder):int");
    }

    protected void establishFirstAndSecondConnectedInputs() {
        this.m_firstInput = null;
        this.m_secondInput = null;
        for (Map.Entry<String, List<StepManager>> entry : getStepManager().getIncomingConnections().entrySet()) {
            if (this.m_firstInput != null && this.m_secondInput != null) {
                return;
            }
            for (StepManager stepManager : entry.getValue()) {
                if (this.m_firstInput == null) {
                    this.m_firstInput = stepManager;
                    this.m_firstInputConnectionType = entry.getKey();
                } else if (this.m_secondInput == null) {
                    this.m_secondInput = stepManager;
                    this.m_secondInputConnectionType = entry.getKey();
                }
                if (this.m_firstInput == null || this.m_secondInput == null) {
                }
            }
        }
    }

    protected void generateMergedHeader() throws WekaException {
        int[] iArr;
        String str = this.m_keySpec;
        if (str == null || str.length() == 0) {
            throw new WekaException("Key fields are null!");
        }
        String[] split = environmentSubstitute(this.m_keySpec).split(KEY_SPEC_SEPARATOR);
        if (split.length != 2) {
            throw new WekaException("Invalid key specification");
        }
        int i = 0;
        while (i < 2) {
            String trim = split[i].trim();
            Range range = new Range();
            range.setUpper(i == 0 ? this.m_headerOne.numAttributes() : this.m_headerTwo.numAttributes());
            try {
                range.setRanges(trim);
                if (i == 0) {
                    this.m_keyIndexesOne = range.getSelection();
                } else {
                    this.m_keyIndexesTwo = range.getSelection();
                }
            } catch (IllegalArgumentException unused) {
                String[] split2 = trim.split(",");
                if (i == 0) {
                    this.m_keyIndexesOne = new int[split2.length];
                } else {
                    this.m_keyIndexesTwo = new int[split2.length];
                }
                for (int i2 = 0; i2 < split2.length; i2++) {
                    String trim2 = split2[i2].trim();
                    Attribute attribute = i == 0 ? this.m_headerOne.attribute(trim2) : this.m_headerTwo.attribute(trim2);
                    if (attribute == null) {
                        throw new WekaException("Invalid key attribute name");
                    }
                    if (i == 0) {
                        this.m_keyIndexesOne[i2] = attribute.index();
                    } else {
                        this.m_keyIndexesTwo[i2] = attribute.index();
                    }
                }
                continue;
            }
            i++;
        }
        int[] iArr2 = this.m_keyIndexesOne;
        if (iArr2 == null || (iArr = this.m_keyIndexesTwo) == null) {
            throw new WekaException("Key fields are null!");
        }
        if (iArr2.length != iArr.length) {
            throw new WekaException("Number of key fields are different for each input");
        }
        int i3 = 0;
        while (true) {
            int[] iArr3 = this.m_keyIndexesOne;
            if (i3 >= iArr3.length) {
                ArrayList arrayList = new ArrayList();
                HashSet hashSet = new HashSet();
                for (int i4 = 0; i4 < this.m_headerOne.numAttributes(); i4++) {
                    arrayList.add((Attribute) this.m_headerOne.attribute(i4).copy());
                    hashSet.add(this.m_headerOne.attribute(i4).name());
                }
                for (int i5 = 0; i5 < this.m_headerTwo.numAttributes(); i5++) {
                    String name = this.m_headerTwo.attribute(i5).name();
                    if (hashSet.contains(name)) {
                        name = name + "_2";
                    }
                    arrayList.add(this.m_headerTwo.attribute(i5).copy(name));
                }
                this.m_mergedHeader = new Instances(this.m_headerOne.relationName() + "+" + this.m_headerTwo.relationName(), (ArrayList<Attribute>) arrayList, 0);
                this.m_stringAttsPresent = false;
                if (this.m_mergedHeader.checkForStringAttributes()) {
                    this.m_stringAttsPresent = true;
                    this.m_headerPool = new ArrayList();
                    this.m_count = new AtomicInteger();
                    for (int i6 = 0; i6 < 10; i6++) {
                        try {
                            this.m_headerPool.add((Instances) new SerializedObject(this.m_mergedHeader).getObject());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    return;
                }
                return;
            }
            if (this.m_headerOne.attribute(iArr3[i3]).type() != this.m_headerTwo.attribute(this.m_keyIndexesTwo[i3]).type()) {
                throw new WekaException("Type of key corresponding to key fields differ: input 1 - " + Attribute.typeToStringShort(this.m_headerOne.attribute(this.m_keyIndexesOne[i3])) + " input 2 - " + Attribute.typeToStringShort(this.m_headerTwo.attribute(this.m_keyIndexesTwo[i3])));
            }
            i3++;
        }
    }

    protected synchronized Instance generateMergedInstance(Sorter.InstanceHolder instanceHolder, Sorter.InstanceHolder instanceHolder2) {
        DenseInstance denseInstance;
        double[] dArr = new double[this.m_mergedHeader.numAttributes()];
        Instances instances = this.m_mergedHeader;
        if (this.m_runningIncrementally && this.m_stringAttsPresent) {
            instances = this.m_headerPool.get(this.m_count.getAndIncrement() % 10);
        }
        int i = 0;
        for (int i2 = 0; i2 < this.m_headerOne.numAttributes(); i2++) {
            dArr[i] = instanceHolder.m_instance.value(i2);
            if (instanceHolder.m_stringVals != null && instanceHolder.m_stringVals.size() > 0 && this.m_mergedHeader.attribute(i).isString()) {
                instances.attribute(i).setStringValue(instanceHolder.m_stringVals.get(instanceHolder.m_instance.attribute(i2).name()));
                dArr[i] = 0.0d;
            }
            i++;
        }
        for (int i3 = 0; i3 < this.m_headerTwo.numAttributes(); i3++) {
            dArr[i] = instanceHolder2.m_instance.value(i3);
            if (instanceHolder2.m_stringVals != null && instanceHolder2.m_stringVals.size() > 0 && this.m_mergedHeader.attribute(i).isString()) {
                instances.attribute(i).setStringValue(instanceHolder.m_stringVals.get(instanceHolder2.m_instance.attribute(i3).name()));
                dArr[i] = 0.0d;
            }
            i++;
        }
        denseInstance = new DenseInstance(1.0d, dArr);
        denseInstance.setDataset(instances);
        return denseInstance;
    }

    public List<String> getConnectedInputNames() {
        establishFirstAndSecondConnectedInputs();
        ArrayList arrayList = new ArrayList();
        StepManager stepManager = this.m_firstInput;
        arrayList.add(stepManager != null ? stepManager.getName() : null);
        StepManager stepManager2 = this.m_secondInput;
        arrayList.add(stepManager2 != null ? stepManager2.getName() : null);
        return arrayList;
    }

    @Override // weka.knowledgeflow.steps.BaseStep, weka.knowledgeflow.steps.Step
    public String getCustomEditorForStep() {
        return "weka.gui.knowledgeflow.steps.JoinStepEditorDialog";
    }

    public Instances getFirstInputStructure() throws WekaException {
        if (this.m_firstInput == null) {
            establishFirstAndSecondConnectedInputs();
        }
        if (this.m_firstInput != null) {
            return getStepManager().getIncomingStructureFromStep(this.m_firstInput, this.m_firstInputConnectionType);
        }
        return null;
    }

    @Override // weka.knowledgeflow.steps.Step, weka.knowledgeflow.steps.BaseStepExtender
    public List<String> getIncomingConnectionTypes() {
        ArrayList arrayList = new ArrayList();
        if (getStepManager().numIncomingConnections() == 0) {
            return Arrays.asList("instance", StepManager.CON_DATASET, StepManager.CON_TRAININGSET, StepManager.CON_TESTSET);
        }
        if (getStepManager().numIncomingConnections() != 1) {
            return null;
        }
        arrayList.addAll(getStepManager().getIncomingConnections().keySet());
        return arrayList;
    }

    public String getKeySpec() {
        return this.m_keySpec;
    }

    @Override // weka.knowledgeflow.steps.Step, weka.knowledgeflow.steps.BaseStepExtender
    public List<String> getOutgoingConnectionTypes() {
        if (getStepManager().numIncomingConnections() <= 0) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(getStepManager().getIncomingConnections().keySet());
        return arrayList;
    }

    public Instances getSecondInputStructure() throws WekaException {
        if (this.m_secondInput == null) {
            establishFirstAndSecondConnectedInputs();
        }
        if (this.m_secondInput != null) {
            return getStepManager().getIncomingStructureFromStep(this.m_secondInput, this.m_secondInputConnectionType);
        }
        return null;
    }

    protected synchronized void processBatch(Data data) throws WekaException {
        Instances instances = (Instances) data.getPrimaryPayload();
        if (data.getSourceStep().getStepManager() == this.m_firstInput) {
            this.m_headerOne = new Instances(instances, 0);
            getStepManager().logDetailed("Receiving batch from " + this.m_firstInput.getName());
            for (int i = 0; i < instances.numInstances() && !isStopRequested(); i++) {
                Sorter.InstanceHolder instanceHolder = new Sorter.InstanceHolder();
                instanceHolder.m_instance = instances.instance(i);
                this.m_firstBuffer.add(instanceHolder);
            }
        } else {
            if (data.getSourceStep().getStepManager() != this.m_secondInput) {
                throw new WekaException("This should never happen");
            }
            this.m_headerTwo = new Instances(instances, 0);
            getStepManager().logDetailed("Receiving batch from " + this.m_secondInput.getName());
            for (int i2 = 0; i2 < instances.numInstances() && !isStopRequested(); i2++) {
                Sorter.InstanceHolder instanceHolder2 = new Sorter.InstanceHolder();
                instanceHolder2.m_instance = instances.instance(i2);
                this.m_secondBuffer.add(instanceHolder2);
            }
        }
        if (this.m_firstBuffer.size() > 0 && this.m_secondBuffer.size() > 0) {
            getStepManager().processing();
            generateMergedHeader();
            Instances instances2 = new Instances(this.m_mergedHeader, 0);
            while (!isStopRequested() && this.m_firstBuffer.size() > 0 && this.m_secondBuffer.size() > 0) {
                Instance processBuffers = processBuffers();
                if (processBuffers != null) {
                    instances2.add(processBuffers);
                }
            }
            for (String str : getStepManager().getOutgoingConnections().keySet()) {
                if (isStopRequested()) {
                    return;
                }
                Data data2 = new Data(str, instances2);
                data2.setPayloadElement(StepManager.CON_AUX_DATA_SET_NUM, 1);
                data2.setPayloadElement(StepManager.CON_AUX_DATA_MAX_SET_NUM, 1);
                getStepManager().outputData(data2);
            }
            getStepManager().finished();
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:13:0x0041, code lost:
    
        if (r4 < 0) goto L13;
     */
    /* JADX WARN: Code restructure failed: missing block: B:14:0x0043, code lost:
    
        r5.m_firstBuffer.remove();
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x004e, code lost:
    
        if (r5.m_firstBuffer.size() <= 0) goto L16;
     */
    /* JADX WARN: Code restructure failed: missing block: B:16:0x0050, code lost:
    
        r0 = r5.m_firstBuffer.peek();
        r4 = compare(r0.m_instance, r3, r0, r1);
     */
    /* JADX WARN: Code restructure failed: missing block: B:17:0x005f, code lost:
    
        if (r4 >= 0) goto L34;
     */
    /* JADX WARN: Code restructure failed: missing block: B:19:0x0067, code lost:
    
        if (r5.m_firstBuffer.size() > 0) goto L35;
     */
    /* JADX WARN: Code restructure failed: missing block: B:24:0x006a, code lost:
    
        r5.m_secondBuffer.remove();
     */
    /* JADX WARN: Code restructure failed: missing block: B:25:0x0075, code lost:
    
        if (r5.m_secondBuffer.size() <= 0) goto L23;
     */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x0077, code lost:
    
        r1 = r5.m_secondBuffer.peek();
        r4 = compare(r2, r1.m_instance, r0, r1);
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x0086, code lost:
    
        if (r4 <= 0) goto L36;
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x008e, code lost:
    
        if (r5.m_secondBuffer.size() > 0) goto L38;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected synchronized weka.core.Instance processBuffers() {
        /*
            r5 = this;
            monitor-enter(r5)
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r0 = r5.m_firstBuffer     // Catch: java.lang.Throwable -> L93
            int r0 = r0.size()     // Catch: java.lang.Throwable -> L93
            if (r0 <= 0) goto L90
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r0 = r5.m_secondBuffer     // Catch: java.lang.Throwable -> L93
            int r0 = r0.size()     // Catch: java.lang.Throwable -> L93
            if (r0 <= 0) goto L90
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r0 = r5.m_firstBuffer     // Catch: java.lang.Throwable -> L93
            java.lang.Object r0 = r0.peek()     // Catch: java.lang.Throwable -> L93
            weka.knowledgeflow.steps.Sorter$InstanceHolder r0 = (weka.knowledgeflow.steps.Sorter.InstanceHolder) r0     // Catch: java.lang.Throwable -> L93
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r1 = r5.m_secondBuffer     // Catch: java.lang.Throwable -> L93
            java.lang.Object r1 = r1.peek()     // Catch: java.lang.Throwable -> L93
            weka.knowledgeflow.steps.Sorter$InstanceHolder r1 = (weka.knowledgeflow.steps.Sorter.InstanceHolder) r1     // Catch: java.lang.Throwable -> L93
            weka.core.Instance r2 = r0.m_instance     // Catch: java.lang.Throwable -> L93
            weka.core.Instance r3 = r1.m_instance     // Catch: java.lang.Throwable -> L93
            int r4 = r5.compare(r2, r3, r0, r1)     // Catch: java.lang.Throwable -> L93
            if (r4 != 0) goto L41
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r0 = r5.m_firstBuffer     // Catch: java.lang.Throwable -> L93
            java.lang.Object r0 = r0.remove()     // Catch: java.lang.Throwable -> L93
            weka.knowledgeflow.steps.Sorter$InstanceHolder r0 = (weka.knowledgeflow.steps.Sorter.InstanceHolder) r0     // Catch: java.lang.Throwable -> L93
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r1 = r5.m_secondBuffer     // Catch: java.lang.Throwable -> L93
            java.lang.Object r1 = r1.remove()     // Catch: java.lang.Throwable -> L93
            weka.knowledgeflow.steps.Sorter$InstanceHolder r1 = (weka.knowledgeflow.steps.Sorter.InstanceHolder) r1     // Catch: java.lang.Throwable -> L93
            weka.core.Instance r0 = r5.generateMergedInstance(r0, r1)     // Catch: java.lang.Throwable -> L93
            monitor-exit(r5)
            return r0
        L41:
            if (r4 >= 0) goto L6a
        L43:
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r0 = r5.m_firstBuffer     // Catch: java.lang.Throwable -> L93
            r0.remove()     // Catch: java.lang.Throwable -> L93
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r0 = r5.m_firstBuffer     // Catch: java.lang.Throwable -> L93
            int r0 = r0.size()     // Catch: java.lang.Throwable -> L93
            if (r0 <= 0) goto L5f
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r0 = r5.m_firstBuffer     // Catch: java.lang.Throwable -> L93
            java.lang.Object r0 = r0.peek()     // Catch: java.lang.Throwable -> L93
            weka.knowledgeflow.steps.Sorter$InstanceHolder r0 = (weka.knowledgeflow.steps.Sorter.InstanceHolder) r0     // Catch: java.lang.Throwable -> L93
            weka.core.Instance r2 = r0.m_instance     // Catch: java.lang.Throwable -> L93
            int r0 = r5.compare(r2, r3, r0, r1)     // Catch: java.lang.Throwable -> L93
            r4 = r0
        L5f:
            if (r4 >= 0) goto L90
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r0 = r5.m_firstBuffer     // Catch: java.lang.Throwable -> L93
            int r0 = r0.size()     // Catch: java.lang.Throwable -> L93
            if (r0 > 0) goto L43
            goto L90
        L6a:
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r1 = r5.m_secondBuffer     // Catch: java.lang.Throwable -> L93
            r1.remove()     // Catch: java.lang.Throwable -> L93
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r1 = r5.m_secondBuffer     // Catch: java.lang.Throwable -> L93
            int r1 = r1.size()     // Catch: java.lang.Throwable -> L93
            if (r1 <= 0) goto L86
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r1 = r5.m_secondBuffer     // Catch: java.lang.Throwable -> L93
            java.lang.Object r1 = r1.peek()     // Catch: java.lang.Throwable -> L93
            weka.knowledgeflow.steps.Sorter$InstanceHolder r1 = (weka.knowledgeflow.steps.Sorter.InstanceHolder) r1     // Catch: java.lang.Throwable -> L93
            weka.core.Instance r3 = r1.m_instance     // Catch: java.lang.Throwable -> L93
            int r1 = r5.compare(r2, r3, r0, r1)     // Catch: java.lang.Throwable -> L93
            r4 = r1
        L86:
            if (r4 <= 0) goto L90
            java.util.Queue<weka.knowledgeflow.steps.Sorter$InstanceHolder> r1 = r5.m_secondBuffer     // Catch: java.lang.Throwable -> L93
            int r1 = r1.size()     // Catch: java.lang.Throwable -> L93
            if (r1 > 0) goto L6a
        L90:
            r0 = 0
            monitor-exit(r5)
            return r0
        L93:
            r0 = move-exception
            monitor-exit(r5)
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: weka.knowledgeflow.steps.Join.processBuffers():weka.core.Instance");
    }

    @Override // weka.knowledgeflow.steps.BaseStep, weka.knowledgeflow.steps.Step, weka.knowledgeflow.steps.BaseStepExtender
    public void processIncoming(Data data) throws WekaException {
        if (data.getConnectionName().equals("instance")) {
            processStreaming(data);
            if (isStopRequested()) {
                getStepManager().interrupted();
                return;
            }
            return;
        }
        processBatch(data);
        if (isStopRequested()) {
            getStepManager().interrupted();
        }
    }

    protected synchronized void processStreaming(Data data) throws WekaException {
        if (isStopRequested()) {
            return;
        }
        if (getStepManager().isStreamFinished(data)) {
            if (data.getSourceStep().getStepManager() == this.m_firstInput) {
                this.m_firstFinished = true;
                getStepManager().logBasic("Finished receiving from " + this.m_firstInput.getName());
            } else if (data.getSourceStep().getStepManager() == this.m_secondInput) {
                this.m_secondFinished = true;
                getStepManager().logBasic("Finished receiving from " + this.m_secondInput.getName());
            }
            if (this.m_firstFinished && this.m_secondFinished) {
                clearBuffers();
                this.m_streamingData.clearPayload();
                getStepManager().throughputFinished(this.m_streamingData);
            }
            return;
        }
        Instance instance = (Instance) data.getPrimaryPayload();
        StepManager stepManager = data.getSourceStep().getStepManager();
        if (this.m_headerOne == null || this.m_headerTwo == null) {
            if (this.m_headerOne == null && stepManager == this.m_firstInput) {
                this.m_headerOne = new Instances(instance.dataset(), 0);
                getStepManager().logBasic("Initializing buffer for " + this.m_firstInput.getName());
                this.m_stringAttIndexesOne = new HashMap();
                for (int i = 0; i < this.m_headerOne.numAttributes(); i++) {
                    if (this.m_headerOne.attribute(i).isString()) {
                        this.m_stringAttIndexesOne.put(this.m_headerOne.attribute(i).name(), Integer.valueOf(i));
                    }
                }
            }
            if (this.m_headerTwo == null && stepManager == this.m_secondInput) {
                this.m_headerTwo = new Instances(instance.dataset(), 0);
                getStepManager().logBasic("Initializing buffer for " + this.m_secondInput.getName());
                this.m_stringAttIndexesTwo = new HashMap();
                for (int i2 = 0; i2 < this.m_headerTwo.numAttributes(); i2++) {
                    if (this.m_headerTwo.attribute(i2).isString()) {
                        this.m_stringAttIndexesTwo.put(this.m_headerTwo.attribute(i2).name(), Integer.valueOf(i2));
                    }
                }
            }
            if (this.m_mergedHeader == null && this.m_headerOne != null && this.m_headerTwo != null && this.m_keySpec != null && this.m_keySpec.length() > 0) {
                generateMergedHeader();
            }
        }
        if (stepManager == this.m_firstInput) {
            addToFirstBuffer(instance);
        } else {
            addToSecondBuffer(instance);
        }
        if (stepManager == this.m_firstInput && this.m_secondBuffer.size() <= 100 && this.m_secondIsWaiting) {
            this.m_secondIsWaiting = false;
            notifyAll();
        } else if (stepManager == this.m_secondInput && this.m_secondBuffer.size() <= 100 && this.m_firstIsWaiting) {
            this.m_firstIsWaiting = false;
            notifyAll();
        }
        if (isStopRequested()) {
            return;
        }
        Instance processBuffers = processBuffers();
        if (processBuffers != null) {
            getStepManager().throughputUpdateStart();
            this.m_streamingData.setPayloadElement("instance", processBuffers);
            getStepManager().outputData(this.m_streamingData);
            getStepManager().throughputUpdateEnd();
        }
    }

    public void setKeySpec(String str) {
        this.m_keySpec = str;
    }

    @Override // weka.knowledgeflow.steps.Step, weka.knowledgeflow.steps.BaseStepExtender
    public void stepInit() throws WekaException {
        this.m_firstBuffer = new LinkedList();
        this.m_secondBuffer = new LinkedList();
        this.m_streamingData = new Data("instance");
        this.m_firstInput = null;
        this.m_secondInput = null;
        this.m_headerOne = null;
        this.m_headerTwo = null;
        this.m_firstFinished = false;
        this.m_secondFinished = false;
        if (getStepManager().numIncomingConnections() < 2) {
            throw new WekaException("Two incoming connections are required for the Join step");
        }
        establishFirstAndSecondConnectedInputs();
    }
}
