From bfd1829c04d1a9cbd3473c4e7a8282e123841d77 Mon Sep 17 00:00:00 2001 From: Astha_0524857 Date: Sat, 20 Jun 2026 11:47:14 +0200 Subject: [PATCH 1/3] setup project --- hello.dml | 1 + .../ooc/CentralMomentOOCInstruction.java | 147 +++++++++--------- 2 files changed, 74 insertions(+), 74 deletions(-) create mode 100644 hello.dml diff --git a/hello.dml b/hello.dml new file mode 100644 index 00000000000..7df869a15e7 --- /dev/null +++ b/hello.dml @@ -0,0 +1 @@ +print("Hello, World!") diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CentralMomentOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CentralMomentOOCInstruction.java index 1c73b636341..f642ec3503a 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CentralMomentOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CentralMomentOOCInstruction.java @@ -35,78 +35,77 @@ public class CentralMomentOOCInstruction extends AggregateUnaryOOCInstruction { - private CentralMomentOOCInstruction(CMOperator cm, CPOperand in1, CPOperand in2, CPOperand in3, CPOperand out, - String opcode, String str) { - super(OOCType.CM, cm, in1, in2, in3, out, opcode, str); - } - - public static CentralMomentOOCInstruction parseInstruction(String str) { - CentralMomentCPInstruction cpInst = CentralMomentCPInstruction.parseInstruction(str); - return parseInstruction(cpInst); - } - - public static CentralMomentOOCInstruction parseInstruction(CentralMomentCPInstruction inst) { - return new CentralMomentOOCInstruction((CMOperator) inst.getOperator(), inst.input1, inst.input2, inst.input3, - inst.output, inst.getOpcode(), inst.getInstructionString()); - } - - @Override - public void processInstruction(ExecutionContext ec) { - String output_name = output.getName(); - - /* - * The "order" of the central moment in the instruction can - * be set to INVALID when the exact value is unknown at - * compilation time. We first need to determine the exact - * order and update the CMOperator, if needed. - */ - - MatrixObject matObj = ec.getMatrixObject(input1.getName()); - OOCStream qIn = matObj.getStreamHandle(); - - CPOperand scalarInput = (input3 == null ? input2 : input3); - ScalarObject order = ec.getScalarInput(scalarInput); - - CMOperator cm_op = ((CMOperator) _optr); - if(cm_op.getAggOpType() == CMOperator.AggregateOperationTypes.INVALID) - cm_op = cm_op.setCMAggOp((int) order.getLongValue()); - - CMOperator finalCm_op = cm_op; - - OOCStream cmObjs = createWritableStream(); - - if(input3 == null) { - mapOOC(qIn, cmObjs, tmp -> ((MatrixBlock) tmp.getValue()).cmOperations(new CMOperator(finalCm_op))); // Need to copy CMOperator as its ValueFunction is stateful - } - else { - // Here we use a hash join approach - // Note that this may keep blocks in the cache for a while, depending on when a matching block arrives in the stream - MatrixObject wtObj = ec.getMatrixObject(input2.getName()); - - DataCharacteristics dc = ec.getDataCharacteristics(input1.getName()); - DataCharacteristics dcW = ec.getDataCharacteristics(input2.getName()); - - if (dc.getBlocksize() != dcW.getBlocksize()) - throw new DMLRuntimeException("Different block sizes are not yet supported"); - - OOCStream wIn = wtObj.getStreamHandle(); - - joinOOC(qIn, wIn, cmObjs, - (tmp, weights) -> - ((MatrixBlock) tmp.getValue()).cmOperations(new CMOperator(finalCm_op), (MatrixBlock) weights.getValue()), - IndexedMatrixValue::getIndexes); - } - - try { - CmCovObject agg = cmObjs.dequeue(); - CmCovObject next; - - while ((next = cmObjs.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) - agg = (CmCovObject) finalCm_op.fn.execute(agg, next); - - ec.setScalarOutput(output_name, new DoubleObject(agg.getRequiredResult(finalCm_op))); - } catch (Exception ex) { - throw new DMLRuntimeException(ex); - } - } + private CentralMomentOOCInstruction(CMOperator cm, CPOperand in1, CPOperand in2, CPOperand in3, CPOperand out, + String opcode, String str) { + super(OOCType.CM, cm, in1, in2, in3, out, opcode, str); + } + + public static CentralMomentOOCInstruction parseInstruction(String str) { + CentralMomentCPInstruction cpInst = CentralMomentCPInstruction.parseInstruction(str); + return parseInstruction(cpInst); + } + + public static CentralMomentOOCInstruction parseInstruction(CentralMomentCPInstruction inst) { + return new CentralMomentOOCInstruction((CMOperator) inst.getOperator(), inst.input1, inst.input2, inst.input3, + inst.output, inst.getOpcode(), inst.getInstructionString()); + } + + @Override + public void processInstruction(ExecutionContext ec) { + String output_name = output.getName(); + + /* + * The "order" of the central moment in the instruction can + * be set to INVALID when the exact value is unknown at + * compilation time. We first need to determine the exact + * order and update the CMOperator, if needed. + */ + + MatrixObject matObj = ec.getMatrixObject(input1.getName()); + OOCStream qIn = matObj.getStreamHandle(); + + CPOperand scalarInput = (input3 == null ? input2 : input3); + ScalarObject order = ec.getScalarInput(scalarInput); + + CMOperator cm_op = ((CMOperator) _optr); + if (cm_op.getAggOpType() == CMOperator.AggregateOperationTypes.INVALID) + cm_op = cm_op.setCMAggOp((int) order.getLongValue()); + + CMOperator finalCm_op = cm_op; + + OOCStream cmObjs = createWritableStream(); + + if (input3 == null) { + mapOOC(qIn, cmObjs, tmp -> ((MatrixBlock) tmp.getValue()).cmOperations(new CMOperator(finalCm_op))); // Need to copy CMOperator as its ValueFunction is stateful + } else { + // Here we use a hash join approach + // Note that this may keep blocks in the cache for a while, depending on when a matching block arrives in the stream + MatrixObject wtObj = ec.getMatrixObject(input2.getName()); + + DataCharacteristics dc = ec.getDataCharacteristics(input1.getName()); + DataCharacteristics dcW = ec.getDataCharacteristics(input2.getName()); + + if (dc.getBlocksize() != dcW.getBlocksize()) + throw new DMLRuntimeException("Different block sizes are not yet supported"); + + OOCStream wIn = wtObj.getStreamHandle(); + + joinOOC(qIn, wIn, cmObjs, + (tmp, weights) -> + ((MatrixBlock) tmp.getValue()).cmOperations(new CMOperator(finalCm_op), (MatrixBlock) weights.getValue()), + IndexedMatrixValue::getIndexes); + } + + try { + CmCovObject agg = cmObjs.dequeue(); + CmCovObject next; + + while ((next = cmObjs.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) + agg = (CmCovObject) finalCm_op.fn.execute(agg, next); + + ec.setScalarOutput(output_name, new DoubleObject(agg.getRequiredResult(finalCm_op))); + } catch (Exception ex) { + throw new DMLRuntimeException(ex); + } + } } From d3e84a23f08ea632c036b96e1136da7b9a3b2c7e Mon Sep 17 00:00:00 2001 From: Astha_0524857 Date: Sat, 20 Jun 2026 12:53:31 +0200 Subject: [PATCH 2/3] test: add failing OOC covariance tests --- .../test/functions/ooc/CovarianceTest.java | 130 ++++++++++++++++++ src/test/scripts/functions/ooc/Covariance.dml | 29 ++++ 2 files changed, 159 insertions(+) create mode 100644 src/test/java/org/apache/sysds/test/functions/ooc/CovarianceTest.java create mode 100644 src/test/scripts/functions/ooc/Covariance.dml diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceTest.java new file mode 100644 index 00000000000..34ed0a5b741 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.ooc; + +import java.io.IOException; + +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +public class CovarianceTest extends AutomatedTestBase { + private final static String TEST_NAME = "Covariance"; + private final static String TEST_DIR = "functions/ooc/"; + private final static String TEST_CLASS_DIR = TEST_DIR + CovarianceTest.class.getSimpleName() + "/"; + private final static double eps = 1e-10; + + private final static String INPUT_A = "A"; + private final static String INPUT_B = "B"; + private final static String OUTPUT_CP = "R_CP"; + private final static String OUTPUT_OOC = "R_OOC"; + + private final static int rows = 1871; + private final static int cols = 1; + private final static int blocksize = 1000; + private final static int maxVal = 7; + + private final static double denseSparsity = 0.65; + private final static double sparseSparsity = 0.05; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME, + new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] { OUTPUT_CP, OUTPUT_OOC })); + } + + @Test + public void testCovarianceDenseOOC() { + runCovarianceOOCCompareTest(false); + } + + @Test + public void testCovarianceSparseOOC() { + runCovarianceOOCCompareTest(true); + } + + private void runCovarianceOOCCompareTest(boolean sparse) { + Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE); + + try { + getAndLoadTestConfiguration(TEST_NAME); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + + double sparsity = sparse ? sparseSparsity : denseSparsity; + + double[][] A = getRandomMatrix(rows, cols, 1, maxVal, sparsity, 7); + double[][] B = getRandomMatrix(rows, cols, 1, maxVal, sparsity, 823); + + MatrixBlock ABlock = DataConverter.convertToMatrixBlock(A); + MatrixBlock BBlock = DataConverter.convertToMatrixBlock(B); + + writeBinaryMatrix(INPUT_A, ABlock, rows, cols, blocksize); + writeBinaryMatrix(INPUT_B, BBlock, rows, cols, blocksize); + + // Reference run: normal single-node CP execution. + programArgs = new String[] { + "-args", input(INPUT_A), input(INPUT_B), output(OUTPUT_CP) + }; + runTest(true, false, null, -1); + + // OOC run: this is expected to fail in Week 1 until covariance OOC support is implemented. + programArgs = new String[] { + "-explain", "-stats", "-ooc", + "-args", input(INPUT_A), input(INPUT_B), output(OUTPUT_OOC) + }; + runTest(true, false, null, -1); + + MatrixBlock cpResult = DataConverter.readMatrixFromHDFS( + output(OUTPUT_CP), Types.FileFormat.BINARY, 1, 1, blocksize, 1); + + MatrixBlock oocResult = DataConverter.readMatrixFromHDFS( + output(OUTPUT_OOC), Types.FileFormat.BINARY, 1, 1, blocksize, 1); + + TestUtils.compareMatrices(cpResult, oocResult, eps); + } + catch(IOException ex) { + throw new RuntimeException(ex); + } + finally { + resetExecMode(platformOld); + } + } + + private void writeBinaryMatrix(String name, MatrixBlock mb, int rows, int cols, int blocksize) throws IOException { + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY); + writer.writeMatrixToHDFS(mb, input(name), rows, cols, blocksize, mb.getNonZeros()); + + HDFSTool.writeMetaDataFile(input(name + ".mtd"), + Types.ValueType.FP64, + new MatrixCharacteristics(rows, cols, blocksize, mb.getNonZeros()), + Types.FileFormat.BINARY); + } +} \ No newline at end of file diff --git a/src/test/scripts/functions/ooc/Covariance.dml b/src/test/scripts/functions/ooc/Covariance.dml new file mode 100644 index 00000000000..da051a86245 --- /dev/null +++ b/src/test/scripts/functions/ooc/Covariance.dml @@ -0,0 +1,29 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +A = read($1); +B = read($2); + +s = cov(A, B); +res = as.matrix(s); + +write(res, $3, format="binary"); + From 9ce3720b034e3a8926c985d40a24230d655b5c8d Mon Sep 17 00:00:00 2001 From: Astha_0524857 Date: Sat, 20 Jun 2026 13:01:53 +0200 Subject: [PATCH 3/3] bench: define CP versus OOC benchmark plan --- dev/ooc-benchmark-plan.md | 43 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 dev/ooc-benchmark-plan.md diff --git a/dev/ooc-benchmark-plan.md b/dev/ooc-benchmark-plan.md new file mode 100644 index 00000000000..bd87c24c064 --- /dev/null +++ b/dev/ooc-benchmark-plan.md @@ -0,0 +1,43 @@ +# OOC Covariance and TSMM Benchmark Plan + +## Scope + +Benchmarks compare local CP execution against local OOC execution only. + +## Operators + +- cov(A, B) +- cov(A, B, W) +- t(X) %*% X +- X %*% t(X) + +## Benchmark Variables + +- Dense and sparse inputs +- Weighted and unweighted covariance +- TSMM LEFT: t(X) %*% X +- TSMM RIGHT: X %*% t(X) +- Single-tile and multi-tile TSMM outputs +- Block sizes: 500, 1000, and 2000 where feasible +- Warm-up runs before measured runs + +## Initial Matrix Plan + +| Operator | Case | Matrix size | Sparsity | Block size | Comparison | +|---|---:|---:|---:|---:|---| +| cov(A,B) | dense | 10000 x 1 | 0.9 | 1000 | CP vs OOC | +| cov(A,B) | sparse | 10000 x 1 | 0.1 | 1000 | CP vs OOC | +| cov(A,B,W) | dense | 10000 x 1 | 0.9 | 1000 | CP vs OOC | +| cov(A,B,W) | sparse | 10000 x 1 | 0.1 | 1000 | CP vs OOC | +| t(X)%*%X | LEFT single-tile | 10000 x 100 | 0.9 | 1000 | CP vs OOC | +| t(X)%*%X | LEFT multi-tile | 10000 x 3000 | 0.2 | 1000 | CP vs OOC | +| X%*%t(X) | RIGHT single-tile | 100 x 10000 | 0.9 | 1000 | CP vs OOC | +| X%*%t(X) | RIGHT multi-tile | 3000 x 100 | 0.2 | 1000 | CP vs OOC | + +## Measurement Plan + +- 1 or 2 warm-up runs +- 3 measured runs +- Report average runtime +- Report matrix dimensions, sparsity, block size, execution mode, and operator +- Verify correctness against CP output before interpreting runtime