Skip to content

Commit a6099b6

Browse files
committed
compress: Compression statistics and compress for splay / partitions
1 parent d62bda8 commit a6099b6

File tree

1 file changed

+218
-0
lines changed

1 file changed

+218
-0
lines changed

src/compress.q

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
// Disk Compression Functions
2+
// Copyright (c) 2021 Jaskirat Rajasansir
3+
4+
.require.lib each `type`file`time;
5+
6+
7+
/ Schemas returned by .compress.getSplayStats, .compress.getPartitionStats and .compress.splay
8+
.compress.cfg.schemas:(`symbol$())!();
9+
.compress.cfg.schemas[`infoSplay]: flip `column`compressedLength`uncompressedLength`algorithm`logicalBlockSize`zipLevel!"SJJIII"$\:();
10+
.compress.cfg.schemas[`infoPartition]: flip `part`table`column`compressedLength`uncompressedLength`algorithm`logicalBlockSize`zipLevel!"*SSJJIII"$\:();
11+
.compress.cfg.schemas[`compSplay]: flip `col`source`target`compressed`inplace`empty`writeMode!"SSSBBBS"$\:();
12+
13+
/ Splay and partition compression option defaults provide the following behaviour
14+
/ - recompress (0b): Any compressed files will be copied
15+
/ - inplace (0b): Source splay path = target splay path will result in error
16+
/ - srcParTxt (1b): 'par.txt' in source partition HDB root will be used
17+
/ - tgtParTxt (1b): 'par.txt' in target partition HDB root will be used
18+
.compress.cfg.compressDefaults:`recompress`inplace`srcParTxt`tgtParTxt!0011b;
19+
20+
21+
/ Default compression modes for each compression type supported within kdb+
22+
.compress.defaults:(`symbol$())!();
23+
.compress.defaults[`none]: (0; 0; 0);
24+
.compress.defaults[`qipc]: (17; 1; 0);
25+
.compress.defaults[`gzip]: (17; 2; 7);
26+
.compress.defaults[`snappy]:(17; 3; 0);
27+
.compress.defaults[`lz4hc]: (17; 4; 9);
28+
29+
30+
.compress.init:{};
31+
32+
33+
/ NOTE: Columns that are uncompressed will have null values for all information in the returned table
34+
/ @param splayPath (FolderPath) A folder path of a splayed table
35+
/ @returns (Table) The compressed stats (via -21!) of each column within the specified splay path
36+
/ @throws InvalidSplayPathException If the specified splay path does not exist, or does not contain a splayed table
37+
/ @see .compress.cfg.schemas
38+
.compress.getSplayStats:{[splayPath]
39+
/ Needs a trailing slash for .Q.qp (in .type.isSplayedTable) to work correctly
40+
if[not "/" = last string splayPath;
41+
splayPath:` sv splayPath,`;
42+
];
43+
44+
if[not[.type.isFolder splayPath] | not .type.isSplayedTable splayPath;
45+
'"InvalidSplayPathException";
46+
];
47+
48+
splayCols:cols splayPath;
49+
50+
compressStats:-21!/:` sv/: splayPath,/:splayCols;
51+
52+
statsTbl:.compress.cfg.schemas[`infoSplay] upsert/ compressStats;
53+
statsTbl:update column:splayCols from statsTbl;
54+
:statsTbl;
55+
};
56+
57+
/ NOTE: Columns that are uncompressed will have null values for all information in the returned table
58+
/ @param hdbRoot (FolderPath) The root folder containing a partitioned HDB
59+
/ @param partVal (Date|Month|Year|Long) The specific partition within the HDB to retrieve compression stats for
60+
/ @returns (Table) The compression stats (via -21!) of each column within each table within the specified HDB partition
61+
/ @throws InvalidHdbRootException If the specified HDB root folder does not exist
62+
/ @see .compress.getSplayStats
63+
.compress.getPartitionStats:{[hdbRoot; partVal]
64+
if[not .type.isFolder hdbRoot;
65+
'"InvalidHdbRootException";
66+
];
67+
68+
partRoot:` sv hdbRoot,.type.ensureSymbol partVal;
69+
partTbls:.file.ls partRoot;
70+
partTblPaths:` sv/: partRoot,/:partTbls,\:`;
71+
72+
compressStats:{[path; tbl]
73+
:update table:tbl from .compress.getSplayStats path;
74+
}'[partTblPaths; partTbls];
75+
76+
statsTbl:.compress.cfg.schemas[`infoPartition] upsert raze compressStats;
77+
statsTbl:update part:partVal from statsTbl;
78+
:statsTbl;
79+
};
80+
81+
82+
/ Compresses splayed tables
83+
/ Based on the specified parameters, the functions behaviour (returned in the 'writeMode' column) for each column will be:
84+
/ - 'compress': The file is uncompressed, or is compressed and the 'recompress' option is true
85+
/ - 'copy': The file is either empty (0 = count) or is already compressed and the 'recompress' option is missing or false
86+
/ - 'ignore': The would've been copied (as above) but inplace
87+
/ @param sourceSplayPath (FolderPath) The source splay
88+
/ @param targetSplayPath (FolderPath) The target splay. This can be the same as 'sourceSplayPath' ONLY if the 'inplace' option is set to true
89+
/ @param compressType (Symbol|IntegerList) The compression type. If a symbol, the compression settings will be taken from '.compress.defaults'
90+
/ @param options (Dict) 'recompress' - see description above. 'inplace' - must be true if the target is the same as the source
91+
/ @returns (Table) Details of the columns and how they were compressed to the target
92+
/ @throws InvalidSourceSplayPathException If the source path specified is not a splay table folder
93+
/ @throws TargetAlreadyExistsException If the specified target is already a folder
94+
/ @throws InplaceCompressionForbiddenException If the specified target is the same as the source but the 'inplace' option is not set to true
95+
/ @throws InvalidCompressTypeException If the symbol reference is not in '.compress.defaults' or not a 3-element integer list
96+
/ @see .compress.cfg.compressDefaults
97+
/ @see .compress.cfg.schemas
98+
/ @see .compress.defaults
99+
.compress.splay:{[sourceSplayPath; targetSplayPath; compressType; options]
100+
options:.compress.cfg.compressDefaults ^ options;
101+
102+
/ Needs a trailing slash for .Q.qp (in .type.isSplayedTable) to work correctly
103+
if[not .type.isSplayedTable ` sv sourceSplayPath,`;
104+
'"InvalidSourceSplayPathException";
105+
];
106+
107+
if[0 < count .file.ls targetSplayPath;
108+
if[not targetSplayPath = sourceSplayPath;
109+
.log.if.error ("Target folder already exists. Will not compress data to this folder [ Target: {} ]"; targetSplayPath);
110+
'"TargetAlreadyExistsException";
111+
];
112+
113+
if[(targetSplayPath = sourceSplayPath) & not options`inplace;
114+
.log.if.error ("Inplace compression required, but not explicitly specified. Will not compress [ Target: {} ]"; targetSplayPath);
115+
'"InplaceCompressionForbiddenException";
116+
];
117+
];
118+
119+
if[.type.isSymbol compressType;
120+
if[not compressType in key .compress.defaults;
121+
'"InvalidCompressTypeException";
122+
];
123+
124+
compressType:.compress.defaults compressType;
125+
];
126+
127+
if[not 3 = count compressType;
128+
'"InvalidCompressTypeException";
129+
];
130+
131+
132+
compressCfg:.compress.cfg.schemas[`compSplay] upsert flip enlist[`col]!enlist cols sourceSplayPath;
133+
compressCfg:update source:(` sv/: sourceSplayPath,/: col), target:(` sv/: targetSplayPath,/: col) from compressCfg;
134+
compressCfg:update compressed:.file.isCompressed each source from compressCfg;
135+
compressCfg:update empty:0 = count first .Q.V sourceSplayPath from compressCfg;
136+
compressCfg:update inplace:source=target from compressCfg;
137+
138+
compressCfg:update writeMode:`compress`copy compressed from compressCfg;
139+
compressCfg:update writeMode:`ignore from compressCfg where inplace, writeMode = `copy;
140+
141+
$[any compressCfg`empty;
142+
compressCfg:update writeMode:`copy from compressCfg where not writeMode = `ignore;
143+
options`recompress;
144+
compressCfg:update writeMode:`compress from compressCfg
145+
];
146+
147+
148+
.log.if.info ("Starting splay table compression [ Source: {} ] [ Target: {} ] [ Compression: {} ]"; sourceSplayPath; targetSplayPath; compressType);
149+
st:.time.now[];
150+
151+
.file.ensureDir targetSplayPath;
152+
153+
{[compressType; colCompressCfg]
154+
.log.if.debug enlist["Processing column [ Source: {} ] [ Target: {} ] [ Write Mode: {} ]"],colCompressCfg`source`target`writeMode;
155+
156+
$[`ignore = colCompressCfg`writeMode;
157+
:(::);
158+
`copy = colCompressCfg`writeMode;
159+
.os.run[`cp; "|" sv 1_/: string colCompressCfg`source`target];
160+
`compress = colCompressCfg`writeMode;
161+
-19!colCompressCfg[`source`target],compressType
162+
];
163+
164+
}[compressType;] each compressCfg;
165+
166+
/ Copy the '.d' file at the end
167+
-19!(` sv sourceSplayPath,`.d; ` sv targetSplayPath,`.d),.compress.defaults`none;
168+
169+
.log.if.info ("Splay table compression complete [ Source: {} ] [ Target: {} ] [ Compression: {} ] [ Time Taken: {} ]"; sourceSplayPath; targetSplayPath; compressType; .time.now[] - st);
170+
171+
:compressCfg;
172+
};
173+
174+
/ Compresses multiple splayed tables within a HDB partition.
175+
/ NOTE: The 'sym' file of the source HDB is not copied or symlinked to the target HDB
176+
/ @param sourceRoot (FolderPath) The path of the source HDB
177+
/ @param targetRoot (FolderPath) The path of the target HDB
178+
/ @param partVal (Date|Month|Year|Long) The specific partition within the HDB to compress
179+
/ @param tbls (SymbolList) The list of tables in the partition to compress
180+
/ @param compressType (Symbol|IntegerList) See '.compress.splay'
181+
/ @param options (Dict) See '.compress.splay', 'srcParTxt' / 'tgtParTxt' - set to false to ignore 'par.txt' in source or target HDBs respectively
182+
/ @throws SourceHdbPartitionDoesNotExistException If the specified source HDB does not exist
183+
/ @see .compress.cfg.compressDefaults
184+
/ @see .compress.splay
185+
.compress.partition:{[sourceRoot; targetRoot; partVal; tbls; compressType; options]
186+
options:.compress.cfg.compressDefaults ^ options;
187+
188+
srcPartPath:.file.hdb.qPar[sourceRoot; partVal];
189+
tgtPartPath:.file.hdb.qPar[targetRoot; partVal];
190+
191+
if[not options`srcParTxt;
192+
srcPartPath:` sv sourceRoot,.type.ensureSymbol partVal;
193+
];
194+
195+
if[not options`tgtParTxt;
196+
tgtPartPath:` sv targetRoot,.type.ensureSymbol partVal;
197+
];
198+
199+
if[not .type.isFolder srcPartPath;
200+
.log.if.error ("Source HDB partition does not exist [ Path: {} ] [ par.txt: {} ]"; srcPartPath; `no`yes options`srcParTxt);
201+
'"SourceHdbPartitionDoesNotExistException";
202+
];
203+
204+
205+
srcTables:tbls inter .file.ls srcPartPath;
206+
207+
srcTblPaths:` sv/: srcPartPath,/:srcTables;
208+
tgtTblPaths:` sv/: tgtPartPath,/:srcTables;
209+
210+
.log.if.info ("Starting HDB partition compression [ Source HDB: {} ] [ Target HDB: {} ] [ Partition: {} ] [ Tables: {} ] [ Compression Type: {} ]"; sourceRoot; targetRoot; partVal; srcTables; compressType);
211+
st:.time.now[];
212+
213+
compressCfg:.compress.splay[;; compressType; options]'[srcTblPaths; tgtTblPaths];
214+
215+
.log.if.info ("HDB partition compression complete [ Source HDB: {} ] [ Target HDB: {} ] [ Partition: {} ] [ Tables: {} ] [ Compression Type: {} ] [ Time Taken: {} ]"; sourceRoot; targetRoot; partVal; srcTables; compressType; .time.now[] - st);
216+
217+
:compressCfg;
218+
};

0 commit comments

Comments
 (0)