Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions dev/ooc-benchmark-plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# OOC Covariance and TSMM Benchmark Plan

## Scope

Benchmarks compare local CP execution against local OOC execution only.

## Operators

- cov(A, B)
- cov(A, B, W)
- t(X) %*% X
- X %*% t(X)

## Benchmark Variables

- Dense and sparse inputs
- Weighted and unweighted covariance
- TSMM LEFT: t(X) %*% X
- TSMM RIGHT: X %*% t(X)
- Single-tile and multi-tile TSMM outputs
- Block sizes: 500, 1000, and 2000 where feasible
- Warm-up runs before measured runs

## Initial Matrix Plan

| Operator | Case | Matrix size | Sparsity | Block size | Comparison |
|---|---:|---:|---:|---:|---|
| cov(A,B) | dense | 10000 x 1 | 0.9 | 1000 | CP vs OOC |
| cov(A,B) | sparse | 10000 x 1 | 0.1 | 1000 | CP vs OOC |
| cov(A,B,W) | dense | 10000 x 1 | 0.9 | 1000 | CP vs OOC |
| cov(A,B,W) | sparse | 10000 x 1 | 0.1 | 1000 | CP vs OOC |
| t(X)%*%X | LEFT single-tile | 10000 x 100 | 0.9 | 1000 | CP vs OOC |
| t(X)%*%X | LEFT multi-tile | 10000 x 3000 | 0.2 | 1000 | CP vs OOC |
| X%*%t(X) | RIGHT single-tile | 100 x 10000 | 0.9 | 1000 | CP vs OOC |
| X%*%t(X) | RIGHT multi-tile | 3000 x 100 | 0.2 | 1000 | CP vs OOC |

## Measurement Plan

- 1 or 2 warm-up runs
- 3 measured runs
- Report average runtime
- Report matrix dimensions, sparsity, block size, execution mode, and operator
- Verify correctness against CP output before interpreting runtime
1 change: 1 addition & 0 deletions hello.dml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
print("Hello, World!")
Original file line number Diff line number Diff line change
Expand Up @@ -35,78 +35,77 @@

public class CentralMomentOOCInstruction extends AggregateUnaryOOCInstruction {

private CentralMomentOOCInstruction(CMOperator cm, CPOperand in1, CPOperand in2, CPOperand in3, CPOperand out,
String opcode, String str) {
super(OOCType.CM, cm, in1, in2, in3, out, opcode, str);
}

public static CentralMomentOOCInstruction parseInstruction(String str) {
CentralMomentCPInstruction cpInst = CentralMomentCPInstruction.parseInstruction(str);
return parseInstruction(cpInst);
}

public static CentralMomentOOCInstruction parseInstruction(CentralMomentCPInstruction inst) {
return new CentralMomentOOCInstruction((CMOperator) inst.getOperator(), inst.input1, inst.input2, inst.input3,
inst.output, inst.getOpcode(), inst.getInstructionString());
}

@Override
public void processInstruction(ExecutionContext ec) {
String output_name = output.getName();

/*
* The "order" of the central moment in the instruction can
* be set to INVALID when the exact value is unknown at
* compilation time. We first need to determine the exact
* order and update the CMOperator, if needed.
*/

MatrixObject matObj = ec.getMatrixObject(input1.getName());
OOCStream<IndexedMatrixValue> qIn = matObj.getStreamHandle();

CPOperand scalarInput = (input3 == null ? input2 : input3);
ScalarObject order = ec.getScalarInput(scalarInput);

CMOperator cm_op = ((CMOperator) _optr);
if(cm_op.getAggOpType() == CMOperator.AggregateOperationTypes.INVALID)
cm_op = cm_op.setCMAggOp((int) order.getLongValue());

CMOperator finalCm_op = cm_op;

OOCStream<CmCovObject> cmObjs = createWritableStream();

if(input3 == null) {
mapOOC(qIn, cmObjs, tmp -> ((MatrixBlock) tmp.getValue()).cmOperations(new CMOperator(finalCm_op))); // Need to copy CMOperator as its ValueFunction is stateful
}
else {
// Here we use a hash join approach
// Note that this may keep blocks in the cache for a while, depending on when a matching block arrives in the stream
MatrixObject wtObj = ec.getMatrixObject(input2.getName());

DataCharacteristics dc = ec.getDataCharacteristics(input1.getName());
DataCharacteristics dcW = ec.getDataCharacteristics(input2.getName());

if (dc.getBlocksize() != dcW.getBlocksize())
throw new DMLRuntimeException("Different block sizes are not yet supported");

OOCStream<IndexedMatrixValue> wIn = wtObj.getStreamHandle();

joinOOC(qIn, wIn, cmObjs,
(tmp, weights) ->
((MatrixBlock) tmp.getValue()).cmOperations(new CMOperator(finalCm_op), (MatrixBlock) weights.getValue()),
IndexedMatrixValue::getIndexes);
}

try {
CmCovObject agg = cmObjs.dequeue();
CmCovObject next;

while ((next = cmObjs.dequeue()) != LocalTaskQueue.NO_MORE_TASKS)
agg = (CmCovObject) finalCm_op.fn.execute(agg, next);

ec.setScalarOutput(output_name, new DoubleObject(agg.getRequiredResult(finalCm_op)));
} catch (Exception ex) {
throw new DMLRuntimeException(ex);
}
}
private CentralMomentOOCInstruction(CMOperator cm, CPOperand in1, CPOperand in2, CPOperand in3, CPOperand out,
String opcode, String str) {
super(OOCType.CM, cm, in1, in2, in3, out, opcode, str);
}

public static CentralMomentOOCInstruction parseInstruction(String str) {
CentralMomentCPInstruction cpInst = CentralMomentCPInstruction.parseInstruction(str);
return parseInstruction(cpInst);
}

public static CentralMomentOOCInstruction parseInstruction(CentralMomentCPInstruction inst) {
return new CentralMomentOOCInstruction((CMOperator) inst.getOperator(), inst.input1, inst.input2, inst.input3,
inst.output, inst.getOpcode(), inst.getInstructionString());
}

@Override
public void processInstruction(ExecutionContext ec) {
String output_name = output.getName();

/*
* The "order" of the central moment in the instruction can
* be set to INVALID when the exact value is unknown at
* compilation time. We first need to determine the exact
* order and update the CMOperator, if needed.
*/

MatrixObject matObj = ec.getMatrixObject(input1.getName());
OOCStream<IndexedMatrixValue> qIn = matObj.getStreamHandle();

CPOperand scalarInput = (input3 == null ? input2 : input3);
ScalarObject order = ec.getScalarInput(scalarInput);

CMOperator cm_op = ((CMOperator) _optr);
if (cm_op.getAggOpType() == CMOperator.AggregateOperationTypes.INVALID)
cm_op = cm_op.setCMAggOp((int) order.getLongValue());

CMOperator finalCm_op = cm_op;

OOCStream<CmCovObject> cmObjs = createWritableStream();

if (input3 == null) {
mapOOC(qIn, cmObjs, tmp -> ((MatrixBlock) tmp.getValue()).cmOperations(new CMOperator(finalCm_op))); // Need to copy CMOperator as its ValueFunction is stateful
} else {
// Here we use a hash join approach
// Note that this may keep blocks in the cache for a while, depending on when a matching block arrives in the stream
MatrixObject wtObj = ec.getMatrixObject(input2.getName());

DataCharacteristics dc = ec.getDataCharacteristics(input1.getName());
DataCharacteristics dcW = ec.getDataCharacteristics(input2.getName());

if (dc.getBlocksize() != dcW.getBlocksize())
throw new DMLRuntimeException("Different block sizes are not yet supported");

OOCStream<IndexedMatrixValue> wIn = wtObj.getStreamHandle();

joinOOC(qIn, wIn, cmObjs,
(tmp, weights) ->
((MatrixBlock) tmp.getValue()).cmOperations(new CMOperator(finalCm_op), (MatrixBlock) weights.getValue()),
IndexedMatrixValue::getIndexes);
}

try {
CmCovObject agg = cmObjs.dequeue();
CmCovObject next;

while ((next = cmObjs.dequeue()) != LocalTaskQueue.NO_MORE_TASKS)
agg = (CmCovObject) finalCm_op.fn.execute(agg, next);

ec.setScalarOutput(output_name, new DoubleObject(agg.getRequiredResult(finalCm_op)));
} catch (Exception ex) {
throw new DMLRuntimeException(ex);
}
}
}
130 changes: 130 additions & 0 deletions src/test/java/org/apache/sysds/test/functions/ooc/CovarianceTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.test.functions.ooc;

import java.io.IOException;

import org.apache.sysds.common.Types;
import org.apache.sysds.runtime.io.MatrixWriter;
import org.apache.sysds.runtime.io.MatrixWriterFactory;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
import org.junit.Test;

public class CovarianceTest extends AutomatedTestBase {
private final static String TEST_NAME = "Covariance";
private final static String TEST_DIR = "functions/ooc/";
private final static String TEST_CLASS_DIR = TEST_DIR + CovarianceTest.class.getSimpleName() + "/";
private final static double eps = 1e-10;

private final static String INPUT_A = "A";
private final static String INPUT_B = "B";
private final static String OUTPUT_CP = "R_CP";
private final static String OUTPUT_OOC = "R_OOC";

private final static int rows = 1871;
private final static int cols = 1;
private final static int blocksize = 1000;
private final static int maxVal = 7;

private final static double denseSparsity = 0.65;
private final static double sparseSparsity = 0.05;

@Override
public void setUp() {
TestUtils.clearAssertionInformation();
addTestConfiguration(TEST_NAME,
new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] { OUTPUT_CP, OUTPUT_OOC }));
}

@Test
public void testCovarianceDenseOOC() {
runCovarianceOOCCompareTest(false);
}

@Test
public void testCovarianceSparseOOC() {
runCovarianceOOCCompareTest(true);
}

private void runCovarianceOOCCompareTest(boolean sparse) {
Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE);

try {
getAndLoadTestConfiguration(TEST_NAME);

String HOME = SCRIPT_DIR + TEST_DIR;
fullDMLScriptName = HOME + TEST_NAME + ".dml";

double sparsity = sparse ? sparseSparsity : denseSparsity;

double[][] A = getRandomMatrix(rows, cols, 1, maxVal, sparsity, 7);
double[][] B = getRandomMatrix(rows, cols, 1, maxVal, sparsity, 823);

MatrixBlock ABlock = DataConverter.convertToMatrixBlock(A);
MatrixBlock BBlock = DataConverter.convertToMatrixBlock(B);

writeBinaryMatrix(INPUT_A, ABlock, rows, cols, blocksize);
writeBinaryMatrix(INPUT_B, BBlock, rows, cols, blocksize);

// Reference run: normal single-node CP execution.
programArgs = new String[] {
"-args", input(INPUT_A), input(INPUT_B), output(OUTPUT_CP)
};
runTest(true, false, null, -1);

// OOC run: this is expected to fail in Week 1 until covariance OOC support is implemented.
programArgs = new String[] {
"-explain", "-stats", "-ooc",
"-args", input(INPUT_A), input(INPUT_B), output(OUTPUT_OOC)
};
runTest(true, false, null, -1);

MatrixBlock cpResult = DataConverter.readMatrixFromHDFS(
output(OUTPUT_CP), Types.FileFormat.BINARY, 1, 1, blocksize, 1);

MatrixBlock oocResult = DataConverter.readMatrixFromHDFS(
output(OUTPUT_OOC), Types.FileFormat.BINARY, 1, 1, blocksize, 1);

TestUtils.compareMatrices(cpResult, oocResult, eps);
}
catch(IOException ex) {
throw new RuntimeException(ex);
}
finally {
resetExecMode(platformOld);
}
}

private void writeBinaryMatrix(String name, MatrixBlock mb, int rows, int cols, int blocksize) throws IOException {
MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY);
writer.writeMatrixToHDFS(mb, input(name), rows, cols, blocksize, mb.getNonZeros());

HDFSTool.writeMetaDataFile(input(name + ".mtd"),
Types.ValueType.FP64,
new MatrixCharacteristics(rows, cols, blocksize, mb.getNonZeros()),
Types.FileFormat.BINARY);
}
}
29 changes: 29 additions & 0 deletions src/test/scripts/functions/ooc/Covariance.dml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------

A = read($1);
B = read($2);

s = cov(A, B);
res = as.matrix(s);

write(res, $3, format="binary");