Skip to content

Commit 7dc6082

Browse files
committed
[ISSUE#51]: to_avro working
igned-off-by: ashish <ashishpatel0720@gmail.com> Signed-off-by: ashish <ashishpatel0720@gmail.com>
1 parent 8ae2687 commit 7dc6082

File tree

8 files changed

+168
-36021
lines changed

8 files changed

+168
-36021
lines changed

src/commands/avro.ts

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import {Command, flags} from '@oclif/command'
2+
import {createFileEncoder} from 'avsc'
23
import * as avro from 'avsc'
3-
import * as chalk from 'chalk' // includes all from avro-js and some more
4+
import * as chalk from 'chalk'
5+
import * as fs from 'fs' // includes all from avro-js and some more
46

57
import Logger from '../utilities/logger'
68
import Utilities from '../utilities/utilities'
@@ -12,7 +14,7 @@ export default class Avro extends Command {
1214
help: flags.help({char: 'h'}),
1315
file: flags.string({char: 'f' , description: 'input file path'}),
1416
output: flags.string({char: 'o' , description: 'output file path'}),
15-
schema: flags.string({char: 't' , description: 'schema type file path'}),
17+
schemaType: flags.string({char: 't' , description: 'schema type file path'}),
1618

1719
}
1820

@@ -55,7 +57,7 @@ export default class Avro extends Command {
5557
avro.createFileDecoder(flags.file)
5658
.on('metadata', function (type) {
5759
let output = type.schema()
58-
let schemaStr = JSON.stringify(output, null, '\t')
60+
let schemaStr = JSON.stringify(output)
5961
Utilities.writeStringToFile(this, flags.output, schemaStr)
6062
})
6163

@@ -64,20 +66,35 @@ export default class Avro extends Command {
6466
Utilities.truncateFile(this, flags.output)
6567
avro.createFileDecoder(flags.file)
6668
.on('data', function (recordStr) {
67-
Utilities.appendStringToFile(this, flags.output, JSON.stringify(recordStr, null, '\t'))
69+
Utilities.appendStringToFile(this, flags.output, JSON.stringify(recordStr))
6870
})
6971
Logger.success(this, `output written to file: ${chalk.green(flags.output)}`) // this will output error and exit command
7072
}
73+
7174
private toAvro(flags: any, args: any) {
72-
if (!flags.schema)
75+
if (!flags.schemaType)
7376
Logger.error(this, 'Schema file is not provided')
74-
this.checkValidSchema(flags, args)
75-
}
7677

77-
private checkValidSchema(flags: any, args: any) {
78-
let avroFile = avro.parse(flags.file)
79-
// var schemaFile = {name: 'Bob', address: {city: 'Cambridge', zip: '02139'}}
80-
// var status = type.isValid(person); // Boolean status.
81-
console.log(avroFile)
78+
let schema = avro.parse(flags.schemaType)
79+
let avroEncoder = new avro.streams.BlockEncoder(schema)
80+
81+
avroEncoder.pipe(fs.createWriteStream(flags.output))
82+
83+
// We write the records to the block encoder, which will take care of serializing them
84+
// into an object container file.
85+
86+
let jsonStr = '[' + Utilities.getInputString(this, flags, args) + ']'
87+
jsonStr = jsonStr.replace(/[\s\n]+/mg, '')
88+
jsonStr = jsonStr.replace(/\}\{/mg, '},{')
89+
let jsonObjects = JSON.parse(jsonStr)
90+
91+
jsonObjects.forEach( function(data){
92+
if (schema.isValid(data)) {
93+
avroEncoder.write(data)
94+
} else {
95+
Logger.warn(this, `${chalk.yellow('[SKIPPING RECORD]')} schema is invalid: ${chalk.yellowBright(JSON.stringify(data))}`)
96+
}
97+
})
98+
avroEncoder.end()
8299
}
83100
}

src/utilities/utilities.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ export default class Utilities {
4646
Logger.warn(thisRef, `File already exists: ${chalk.green(filePath)}, ${chalk.yellow('overriding content')}`) // this will output error and exit command
4747

4848
fs.writeFileSync(filePath, string)
49-
Logger.success(thisRef, `output written to file: ${chalk.green(filePath)}`) // this will output error and exit command
49+
50+
if (string !== '') // this condition comes for truncating
51+
Logger.success(thisRef, `output written to file: ${chalk.green(filePath)}`) // this will output error and exit command
5052
// return `${chalk.red(pkg)} ${message}`
5153

5254
}

test/resources/avro/complex.json

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
{
2+
"stringArray": [
3+
"string1",
4+
"string2",
5+
"string3",
6+
"string4",
7+
"string5"
8+
],
9+
"longArray": [
10+
1,
11+
2,
12+
3,
13+
4,
14+
5
15+
],
16+
"enumField": "C",
17+
"mapOfInts": {
18+
"key4": 4,
19+
"key3": 3,
20+
"key5": 5,
21+
"key2": 2,
22+
"key1": 1
23+
},
24+
"unionField": "union value",
25+
"fixedField": {
26+
"type": "Buffer",
27+
"data": [
28+
1,
29+
2,
30+
3,
31+
4,
32+
1,
33+
2,
34+
3,
35+
4,
36+
1,
37+
2,
38+
3,
39+
4,
40+
1,
41+
2,
42+
3,
43+
4
44+
]
45+
},
46+
"recordField": {
47+
"longRecordField": 1925639126735,
48+
"stringRecordField": "I am a test record",
49+
"intRecordField": 666,
50+
"floatRecordField": 7171.169921875
51+
}
52+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
{
2+
"name": "example.avro.Complex",
3+
"type": "record",
4+
"fields": [
5+
{
6+
"name": "stringArray",
7+
"type": {
8+
"type": "array",
9+
"items": "string"
10+
}
11+
},
12+
{
13+
"name": "longArray",
14+
"type": {
15+
"type": "array",
16+
"items": "long"
17+
}
18+
},
19+
{
20+
"name": "enumField",
21+
"type": {
22+
"name": "example.avro.foo",
23+
"type": "enum",
24+
"symbols": [
25+
"A",
26+
"B",
27+
"C",
28+
"D"
29+
]
30+
}
31+
},
32+
{
33+
"name": "mapOfInts",
34+
"type": {
35+
"type": "map",
36+
"values": "int"
37+
}
38+
},
39+
{
40+
"name": "unionField",
41+
"type": [
42+
"null",
43+
"string"
44+
]
45+
},
46+
{
47+
"name": "fixedField",
48+
"type": {
49+
"name": "example.avro.md5",
50+
"type": "fixed",
51+
"size": 16
52+
}
53+
},
54+
{
55+
"name": "recordField",
56+
"type": {
57+
"name": "example.avro.TestRecord",
58+
"type": "record",
59+
"fields": [
60+
{
61+
"name": "longRecordField",
62+
"type": "long"
63+
},
64+
{
65+
"name": "stringRecordField",
66+
"type": "string"
67+
},
68+
{
69+
"name": "intRecordField",
70+
"type": "int"
71+
},
72+
{
73+
"name": "floatRecordField",
74+
"type": "float"
75+
}
76+
]
77+
}
78+
}
79+
]
80+
}
286 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)