Skip to content

Commit 3723cd4

Browse files
authored
DGS-22899 Fix support for wrapped Avro unions (#402)
1 parent b4f49e1 commit 3723cd4

File tree

2 files changed

+133
-7
lines changed

2 files changed

+133
-7
lines changed

schemaregistry/serde/avro.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -296,11 +296,15 @@ async function transform(ctx: RuleContext, schema: Type, msg: any, fieldTransfor
296296
switch (schema.typeName) {
297297
case 'union:unwrapped':
298298
case 'union:wrapped':
299-
const subschema = resolveUnion(schema, msg)
299+
let [subschema, submsg] = resolveUnion(schema, msg)
300300
if (subschema == null) {
301-
return null
301+
return msg
302302
}
303-
return await transform(ctx, subschema, msg, fieldTransform)
303+
submsg = await transform(ctx, subschema, submsg, fieldTransform)
304+
if (schema.typeName === 'union:wrapped') {
305+
return {[subschema.branchName!]: submsg}
306+
}
307+
return submsg
304308
case 'array':
305309
const arraySchema = schema as ArrayType
306310
const array = msg as any[]
@@ -408,15 +412,15 @@ function disjoint(slice1: Set<string>, map1: Set<string>): boolean {
408412
return true
409413
}
410414

411-
function resolveUnion(schema: Type, msg: any): Type | null {
415+
function resolveUnion(schema: Type, msg: any): [Type | null, any] {
412416
let unionTypes = null
413417
if (schema.typeName === 'union:unwrapped') {
414418
const union = schema as UnwrappedUnionType
415419
unionTypes = union.types.slice()
416420
if (unionTypes != null) {
417421
for (let i = 0; i < unionTypes.length; i++) {
418422
if (unionTypes[i].isValid(msg)) {
419-
return unionTypes[i]
423+
return [unionTypes[i], msg]
420424
}
421425
}
422426
}
@@ -429,13 +433,15 @@ function resolveUnion(schema: Type, msg: any): Type | null {
429433
let name = keys[0]
430434
for (let i = 0; i < unionTypes.length; i++) {
431435
if (unionTypes[i].branchName === name) {
432-
return unionTypes[i]
436+
return [unionTypes[i], msg[name]]
433437
}
434438
}
439+
} else {
440+
throw new Error('wrapped unions require a name/value pair with the name as the type name')
435441
}
436442
}
437443
}
438-
return null
444+
return [null, msg]
439445
}
440446

441447
function getInlineTags(info: SchemaInfo, deps: Map<string, string>): Map<string, Set<string>> {

schemaregistry/test/serde/avro.spec.ts

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,61 @@ const complexNestedSchema = `
357357
]
358358
}`;
359359

360+
const wrappedUnionSchema = `{
361+
"fields": [
362+
{
363+
"name": "id",
364+
"type": "int"
365+
},
366+
{
367+
"name": "result",
368+
"type": [
369+
"null",
370+
{
371+
"fields": [
372+
{
373+
"name": "code",
374+
"type": "int"
375+
},
376+
{
377+
"confluent:tags": [
378+
"PII"
379+
],
380+
"name": "secret",
381+
"type": [
382+
"null",
383+
"string"
384+
]
385+
}
386+
],
387+
"name": "Data",
388+
"type": "record"
389+
},
390+
{
391+
"fields": [
392+
{
393+
"name": "code",
394+
"type": "int"
395+
},
396+
{
397+
"name": "reason",
398+
"type": [
399+
"null",
400+
"string"
401+
]
402+
}
403+
],
404+
"name": "Error",
405+
"type": "record"
406+
}
407+
]
408+
}
409+
],
410+
"name": "Result",
411+
"namespace": "com.acme",
412+
"type": "record"
413+
}`;
414+
360415
class FakeClock extends Clock {
361416
fixedNow: number = 0
362417

@@ -1473,6 +1528,71 @@ describe('AvroSerializer', () => {
14731528
expect(obj2.stringField).not.toEqual("hi");
14741529
expect(obj2.bytesField).not.toEqual(Buffer.from([1, 2]));
14751530
})
1531+
it('basic encryption with wrapped union', async () => {
1532+
let conf: ClientConfig = {
1533+
baseURLs: [baseURL],
1534+
cacheCapacity: 1000
1535+
}
1536+
let client = SchemaRegistryClient.newClient(conf)
1537+
let serConfig: AvroSerializerConfig = {
1538+
useLatestVersion: true,
1539+
ruleConfig: {
1540+
secret: 'mysecret'
1541+
}
1542+
}
1543+
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)
1544+
let dekClient = fieldEncryptionExecutor.executor.client!
1545+
1546+
let encRule: Rule = {
1547+
name: 'test-encrypt',
1548+
kind: 'TRANSFORM',
1549+
mode: RuleMode.WRITEREAD,
1550+
type: 'ENCRYPT',
1551+
tags: ['PII'],
1552+
params: {
1553+
'encrypt.kek.name': 'kek1',
1554+
'encrypt.kms.type': 'local-kms',
1555+
'encrypt.kms.key.id': 'mykey',
1556+
},
1557+
onFailure: 'ERROR,NONE'
1558+
}
1559+
let ruleSet: RuleSet = {
1560+
domainRules: [encRule]
1561+
}
1562+
1563+
let info: SchemaInfo = {
1564+
schemaType: 'AVRO',
1565+
schema: wrappedUnionSchema,
1566+
ruleSet
1567+
}
1568+
1569+
await client.register(subject, info, false)
1570+
1571+
let obj = {
1572+
id: 123,
1573+
result: {
1574+
'com.acme.Data': {
1575+
code: 456,
1576+
secret: 'mypii'
1577+
}
1578+
}
1579+
}
1580+
let bytes = await ser.serialize(topic, obj)
1581+
1582+
// reset encrypted field
1583+
obj.result["com.acme.Data"].secret = 'mypii'
1584+
1585+
let deserConfig: AvroDeserializerConfig = {
1586+
ruleConfig: {
1587+
secret: 'mysecret'
1588+
}
1589+
}
1590+
let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig)
1591+
fieldEncryptionExecutor.executor.client = dekClient
1592+
let obj2 = await deser.deserialize(topic, bytes)
1593+
expect(obj2.result["com.acme.Data"].code).toEqual(obj.result["com.acme.Data"].code);
1594+
expect(obj2.result["com.acme.Data"].secret).toEqual(obj.result["com.acme.Data"].secret);
1595+
})
14761596
it('basic encryption with logical type', async () => {
14771597
let conf: ClientConfig = {
14781598
baseURLs: [baseURL],

0 commit comments

Comments
 (0)