Skip to content
Open
1 change: 1 addition & 0 deletions src/antlr/Parser.g
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,7 @@ txnColumnCondition[List<ConditionStatement.Raw> conditions]
| K_NULL { conditions.add(new ConditionStatement.Raw(lhs, ConditionStatement.Kind.IS_NULL, null)); }
)
| (txnConditionKind term)=> op=txnConditionKind t=term { conditions.add(new ConditionStatement.Raw(lhs, op, t)); }
| (txnConditionKind rowDataReference)=> op=txnConditionKind rhs=rowDataReference { conditions.add(new ConditionStatement.Raw(lhs, op, rhs)); }
)
| lhs=term op=txnConditionKind rhs=rowDataReference { conditions.add(new ConditionStatement.Raw(lhs, op, rhs)); }
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.VariableSpecifications;
import org.apache.cassandra.cql3.terms.Term;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.accord.txn.TxnCondition;
import org.apache.cassandra.service.accord.txn.TxnReference;

Expand All @@ -41,7 +42,7 @@ public enum Kind
GTE(TxnCondition.Kind.GREATER_THAN_OR_EQUAL, TxnCondition.Kind.LESS_THAN_OR_EQUAL),
LT(TxnCondition.Kind.LESS_THAN, TxnCondition.Kind.GREATER_THAN),
LTE(TxnCondition.Kind.LESS_THAN_OR_EQUAL, TxnCondition.Kind.GREATER_THAN_OR_EQUAL);

// TODO: Support for IN, CONTAINS, CONTAINS KEY

private final TxnCondition.Kind kind;
Expand Down Expand Up @@ -100,8 +101,19 @@ public ConditionStatement prepare(String keyspace, VariableSpecifications bindVa
RowDataReference reference;
Term value;
boolean reversed = false;

if (lhs instanceof RowDataReference.Raw)

if (lhs instanceof RowDataReference.Raw && rhs instanceof RowDataReference.Raw)
{
if (((RowDataReference.Raw) lhs).column() == null)
throw new IllegalStateException(String.format("Row reference (%s) can only be used with IS NULL/IS NOT NULL conditions", lhs.getText()));
if (((RowDataReference.Raw) rhs).column() == null)
throw new IllegalStateException(String.format("Row reference (%s) can only be used with IS NULL/IS NOT NULL conditions", rhs.getText()));
reference = ((RowDataReference.Raw) lhs).prepareAsReceiver();
value = ((RowDataReference.Raw) rhs).prepareAsReceiver();
if (!reference.toResultMetadata().type.equals(((RowDataReference) value).toResultMetadata().type))
throw new InvalidRequestException(String.format("Row reference (%s) must have the same type as row reference (%s)", lhs.getText(), rhs.getText()));
}
else if (lhs instanceof RowDataReference.Raw)
{
if (((RowDataReference.Raw) lhs).column() == null)
throw new IllegalStateException(String.format("Row reference (%s) can only be used with IS NULL/IS NOT NULL conditions", lhs.getText()));
Expand Down Expand Up @@ -143,10 +155,19 @@ public TxnCondition createCondition(QueryOptions options)
case GTE:
case LT:
case LTE:
// TODO: Support for references on LHS and RHS
TxnReference ref = reference.toTxnReference(options);
checkTrue(ref.kind == TxnReference.Kind.COLUMN, "Condition %s requires COLUMN reference but given %s", kind, ref.kind);
return new TxnCondition.Value(ref.asColumn(),
TxnReference refLHS = reference.toTxnReference(options);
checkTrue(refLHS.kind == TxnReference.Kind.COLUMN, "Condition %s requires COLUMN reference but given %s", kind, refLHS.kind);
if (value instanceof RowDataReference)
{
TxnReference refRHS = ((RowDataReference) value).toTxnReference(options);
checkTrue(refRHS.kind == TxnReference.Kind.COLUMN, "Condition %s requires COLUMN reference but given %s", kind, refRHS.kind);
return new TxnCondition.Reference(refLHS.asColumn(),
kind.toTxnKind(reversed),
refRHS.asColumn(),
options.getProtocolVersion());
}

return new TxnCondition.Value(refLHS.asColumn(),
kind.toTxnKind(reversed),
value.bindAndGet(options),
options.getProtocolVersion());
Expand Down
171 changes: 165 additions & 6 deletions src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.db.marshal.UserType;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.rows.Cell;
Expand Down Expand Up @@ -618,6 +621,125 @@ public long serializedSize(Value condition, TableMetadatas tables)
};
}

public static class Reference extends TxnCondition
{
private static final EnumSet<Kind> KINDS = EnumSet.of(Kind.EQUAL, Kind.NOT_EQUAL,
Kind.GREATER_THAN, Kind.GREATER_THAN_OR_EQUAL,
Kind.LESS_THAN, Kind.LESS_THAN_OR_EQUAL);

private final TxnReference.ColumnReference referenceLHS;
private final TxnReference.ColumnReference referenceRHS;
private final ProtocolVersion version;

public Reference(TxnReference.ColumnReference referenceLHS, Kind kind, TxnReference.ColumnReference referenceRHS, ProtocolVersion version)
{
super(kind);
Invariants.requireArgument(KINDS.contains(kind), "Kind " + kind + " cannot be used with a value condition");
this.referenceLHS = referenceLHS;
this.referenceRHS = referenceRHS;
this.version = version;
}

public static EnumSet<Kind> supported()
{
return EnumSet.copyOf(KINDS);
}

@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Reference reference1 = (Reference) o;
return referenceLHS.equals(reference1.referenceLHS) && referenceRHS.equals(reference1.referenceRHS);
}

@Override
public void collect(TableMetadatas.Collector collector)
{
referenceLHS.collect(collector);
referenceRHS.collect(collector);
}

@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), referenceLHS, referenceRHS);
}

@Override
public String toString()
{
return referenceLHS.toString() + ' ' + kind.symbol + ' ' + referenceRHS.toString();
}

public AbstractType<?> getColumnType(TxnReference.ColumnReference reference)
{
ColumnMetadata column = reference.column();
if (reference.isElementSelection())
{
if (column.type instanceof ListType)
return ((ListType<?>) column.type).valueComparator();
else if (column.type instanceof SetType)
return ((SetType<?>) column.type).nameComparator();
else if (column.type instanceof MapType)
return ((MapType<?, ?>) column.type).valueComparator();
}
else if (reference.isFieldSelection())
{
return reference.getFieldSelectionType();
}

return column.type;
}

@Override
public boolean applies(TxnData data)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't correct; there is a reason why Value doesn't do this as this is just SimpleBounds. row1.foo.bar and row1.foo['key'] are not correct I believe

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IF row1.user.id == row2.user.id

the above is actually the following in your code

IF row1.user == row2.user

as the toByteBuffer takes all complex types and returns the top level type, and not what is actually referenced

{
AbstractType<?> typeLHS = getColumnType(referenceLHS);
AbstractType<?> typeRHS = getColumnType(referenceRHS);

ByteBuffer lhs = referenceLHS.toByteBuffer(data, typeLHS);
ByteBuffer rhs = referenceRHS.toByteBuffer(data, typeRHS);

if (lhs == null || rhs == null)
return false;

return kind.operator.isSatisfiedBy(typeLHS, lhs, rhs);
}

private static final ConditionSerializer<Reference> serializer = new ConditionSerializer<>()
{
@Override
public void serialize(Reference condition, TableMetadatas tables, DataOutputPlus out) throws IOException
{
TxnReference.serializer.serialize(condition.referenceLHS, tables, out);
TxnReference.serializer.serialize(condition.referenceRHS, tables, out);
out.writeUTF(condition.version.name());
}

@Override
public Reference deserialize(TableMetadatas tables, DataInputPlus in, Kind kind) throws IOException
{
TxnReference.ColumnReference referenceLHS = TxnReference.serializer.deserialize(tables, in).asColumn();
TxnReference.ColumnReference referenceRHS = TxnReference.serializer.deserialize(tables, in).asColumn();
ProtocolVersion protocolVersion = ProtocolVersion.valueOf(in.readUTF());
return new Reference(referenceLHS, kind, referenceRHS, protocolVersion);
}

@Override
public long serializedSize(Reference condition, TableMetadatas tables)
{
long size = 0;
size += TxnReference.serializer.serializedSize(condition.referenceLHS, tables);
size += TxnReference.serializer.serializedSize(condition.referenceRHS, tables);
size += TypeSizes.sizeof(condition.version.name());
return size;
}
};
}

public static class BooleanGroup extends TxnCondition
{
private static final Set<Kind> KINDS = ImmutableSet.of(Kind.AND, Kind.OR);
Expand Down Expand Up @@ -698,27 +820,64 @@ public long serializedSize(BooleanGroup condition, TableMetadatas tables)

public static final ParameterisedUnversionedSerializer<TxnCondition, TableMetadatas> serializer = new ParameterisedUnversionedSerializer<>()
{
// TOP_BIT is used to differentiate between Value.Serializer and Reference.Serialzer,
// in order to implement comparison between LET variables.
// The reason we use TOP_BIT is to support users who have been deploying off trunk
// to upgrade nodes without breaking them. Upgrading is safe under the following assumptions:
// 1) `ref op ref` feature is only used after all nodes have been upgraded
// 2) cluster can be mixed mode as long as `ref op ref` is not used
// If a user tries to use `ref op ref` in a mixed mode this will lead to undefined errors,
// where the only recovery process is to force older nodes to upgrade
// See CASSANDRA-21458
private static final int TOP_BIT = 0x40000000;

@SuppressWarnings("unchecked")
@Override
public void serialize(TxnCondition condition, TableMetadatas tables, DataOutputPlus out) throws IOException
{
out.writeUnsignedVInt32(condition.kind.ordinal());
condition.kind.serializer().serialize(condition, tables, out);
if (condition instanceof Reference)
{
out.writeUnsignedVInt32(condition.kind.ordinal() | TOP_BIT);
Reference.serializer.serialize((Reference) condition, tables, out);
}
else
{
out.writeUnsignedVInt32(condition.kind.ordinal());
condition.kind.serializer().serialize(condition, tables, out);
}
}

@Override
public TxnCondition deserialize(TableMetadatas tables, DataInputPlus in) throws IOException
{
Kind kind = Kind.values()[in.readUnsignedVInt32()];
return kind.serializer().deserialize(tables, in, kind);
int flag = in.readUnsignedVInt32();
if ((flag & TOP_BIT) != 0)
{
Kind kind = Kind.values()[flag ^ TOP_BIT];
return Reference.serializer.deserialize(tables, in, kind);
}
else
{
Kind kind = Kind.values()[flag];
return kind.serializer().deserialize(tables, in, kind);
}
}

@SuppressWarnings("unchecked")
@Override
public long serializedSize(TxnCondition condition, TableMetadatas tables)
{
long size = TypeSizes.sizeofUnsignedVInt(condition.kind.ordinal());
size += condition.kind.serializer().serializedSize(condition, tables);
long size;
if (condition instanceof Reference)
{
size = TypeSizes.sizeofUnsignedVInt(condition.kind.ordinal() | TOP_BIT);
size += Reference.serializer.serializedSize((Reference) condition, tables);
}
else
{
size = TypeSizes.sizeofUnsignedVInt(condition.kind.ordinal());
size += condition.kind.serializer().serializedSize(condition, tables);
}
return size;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public static <T> void serializeNullable(T value, DataOutputPlus out, Unversione
if (value != null)
serializer.serialize(value, out);
}

public static <T> void serializeNullable(T value, DataOutputPlus out, int version, IVersionedSerializer<T> serializer) throws IOException
{
out.writeBoolean(value != null);
Expand Down
Loading