Skip to content

Commit 0da03a1

Browse files
authored
Merge pull request #106 from vcstuff/awoie/add-cwt
Add CBOR/CWT encoding
2 parents 1b1a63f + 2607d3b commit 0da03a1

File tree

10 files changed

+359
-31
lines changed

10 files changed

+359
-31
lines changed

.github/workflows/ghpages.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ jobs:
2828

2929
- name: "Generate examples"
3030
run: |
31+
npm install -g cborg
3132
python3 -m pip install --upgrade pip
3233
python3 -m pip install -r src/requirements.txt
3334
python3 src/main.py

draft-ietf-oauth-status-list.md

Lines changed: 190 additions & 19 deletions
Large diffs are not rendered by default.

example/status_list_cwt_diag

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
d2 # tag(18)
2+
84 # array(4)
3+
53 # bytes(19)
4+
a20126106e7374617475736c # "¢\x01&\x10nstatusl"
5+
6973742b637774 # "ist+cwt"
6+
a1 # map(1)
7+
04 # uint(4)
8+
42 # bytes(2)
9+
3132 # "12"
10+
58 60 # bytes(96)
11+
a502782168747470733a2f2f # "¥\x02x!https://"
12+
6578616d706c652e636f6d2f # "example.com/"
13+
7374617475736c697374732f # "statuslists/"
14+
31017368747470733a2f2f65 # "1\x01shttps://e"
15+
78616d706c652e636f6d061a # "xample.com\x06\x1a"
16+
648c3fca041a8898c3ca19ff # "d\x8c?Ê\x04\x1a\x88\x98ÃÊ\x19ÿ"
17+
fe56a2646269747301636c73 # "þV¢dbits\x01cls"
18+
744a78dadbb918000217015d # "tJxÚÛ¹\x18\x00\x02\x17\x01]"
19+
58 40 # bytes(64)
20+
3fd60a6d10eb4b4131f1f6c1 # "?Ö\x0am\x10ëKA1ñöÁ"
21+
2fb365ae27b969e8e8df0b4f # "/³e®'¹ièèß\x0bO"
22+
4029815b679cb1051c1c9eb3 # "@)\x81[g\x9c±\x05\x1c\x1c\x9e³"
23+
6aa72f6f17bcfdb5ed443bdf # "j§/o\x17¼ýµíD;ß"
24+
c2339568ab42949169b413e7 # "Â3\x95h«B\x94\x91i´\x13ç"
25+
02ae1e6a # "\x02®\x1ej"
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
a2 # map(2)
2+
64 # string(4)
3+
62697473 # "bits"
4+
02 # uint(2)
5+
63 # string(3)
6+
6c7374 # "lst"
7+
4b # bytes(11)
8+
78da3be9f2130003df0207 # "xÚ;éò\x13\x00\x03ß\x02\x07"
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
a2 # map(2)
2+
64 # string(4)
3+
62697473 # "bits"
4+
01 # uint(1)
5+
63 # string(3)
6+
6c7374 # "lst"
7+
4a # bytes(10)
8+
78dadbb918000217015d # "xÚÛ¹\x18\x00\x02\x17\x01]"

src/main.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,26 +50,51 @@ def exampleStatusList2Bit() -> StatusList:
5050

5151
def statusListEncoding1Bit():
5252
status_list = exampleStatusList1Bit()
53-
encoded = status_list.encodeObject()
53+
encoded = status_list.encodeAsJSON()
5454
text = "byte_array = [{}, {}] \nencoded:\n{}".format(
5555
hex(status_list.list[0]),
5656
hex(status_list.list[1]),
5757
util.printObject(encoded)
5858
)
59-
util.outputFile(folder + "status_list_encoding", text)
59+
util.outputFile(folder + "status_list_encoding_json", text)
6060

61+
def statusListEncoding1BitCBOR():
62+
status_list = exampleStatusList1Bit()
63+
encoded = status_list.encodeAsCBOR()
64+
hex_encoded = encoded.hex()
65+
text = "byte_array = [{}, {}] \nencoded:\n{}".format(
66+
hex(status_list.list[0]),
67+
hex(status_list.list[1]),
68+
util.printText(hex_encoded)
69+
)
70+
util.outputFile(folder + "status_list_encoding_cbor", text)
71+
diag = util.printCBORDiagnostics(encoded)
72+
util.outputFile(folder + "status_list_encoding_cbor_diag", diag)
6173

6274
def statusListEncoding2Bit():
6375
status_list = exampleStatusList2Bit()
64-
encoded = status_list.encodeObject()
76+
encoded = status_list.encodeAsJSON()
6577
text = "byte_array = [{}, {}, {}] \nencoded:\n{}".format(
6678
hex(status_list.list[0]),
6779
hex(status_list.list[1]),
6880
hex(status_list.list[2]),
6981
util.printObject(encoded),
7082
)
71-
util.outputFile(folder + "status_list_encoding2", text)
83+
util.outputFile(folder + "status_list_encoding2_json", text)
7284

85+
def statusListEncoding2BitCBOR():
86+
status_list = exampleStatusList2Bit()
87+
encoded = status_list.encodeAsCBOR()
88+
hex_encoded = encoded.hex()
89+
text = "byte_array = [{}, {}, {}] \nencoded:\n{}".format(
90+
hex(status_list.list[0]),
91+
hex(status_list.list[1]),
92+
hex(status_list.list[2]),
93+
util.printText(hex_encoded),
94+
)
95+
util.outputFile(folder + "status_list_encoding2_cbor", text)
96+
diag = util.printCBORDiagnostics(encoded)
97+
util.outputFile(folder + "status_list_encoding2_cbor_diag", diag)
7398

7499
def statusListJWT():
75100
status_list = exampleStatusList1Bit()
@@ -83,10 +108,26 @@ def statusListJWT():
83108
text = util.formatToken(status_jwt, key)
84109
util.outputFile(folder + "status_list_jwt", text)
85110

111+
def statusListCWT():
112+
status_list = exampleStatusList1Bit()
113+
cwt = StatusListToken(
114+
issuer="https://example.com",
115+
subject="https://example.com/statuslists/1",
116+
list=status_list,
117+
key=key,
118+
alg=-7,
119+
)
120+
status_cwt = cwt.buildCWT(iat=iat, exp=exp)
121+
hex_encoded = status_cwt.hex()
122+
util.outputFile(folder + "status_list_cwt", util.printText(hex_encoded))
123+
util.outputFile(folder + "status_list_cwt_diag", util.printCBORDiagnostics(status_cwt))
86124

87125
if __name__ == "__main__":
88126
if not os.path.exists(folder):
89127
os.makedirs(folder)
90128
statusListEncoding1Bit()
91129
statusListEncoding2Bit()
92130
statusListJWT()
131+
statusListEncoding1BitCBOR()
132+
statusListEncoding2BitCBOR()
133+
statusListCWT()

src/requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
jwcrypto==1.4.2
2+
cbor2==5.6.1
3+
cwt==2.7.4

src/status_list.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from base64 import urlsafe_b64decode, urlsafe_b64encode
22
from typing import Dict
33
import zlib
4+
from cbor2 import dumps, loads
45

56

67
class StatusList:
@@ -21,18 +22,29 @@ def fromEncoded(cls, encoded: str, bits: int = 1):
2122
new.decode(encoded)
2223
return new
2324

24-
def encode(self) -> str:
25+
def encodeAsString(self) -> str:
2526
zipped = zlib.compress(self.list, level=9)
2627
return urlsafe_b64encode(zipped).decode().strip("=")
27-
28-
def encodeObject(self) -> Dict:
29-
encoded_list = self.encode()
28+
29+
def encodeAsBytes(self) -> bytes:
30+
return zlib.compress(self.list, level=9)
31+
32+
def encodeAsJSON(self) -> Dict:
33+
encoded_list = self.encodeAsString()
3034
object = {
3135
"bits": self.bits,
3236
"lst": encoded_list,
3337
}
3438
return object
3539

40+
def encodeAsCBOR(self) -> Dict:
41+
encoded_list = self.encodeAsBytes()
42+
object = {
43+
"bits": self.bits,
44+
"lst": encoded_list,
45+
}
46+
return dumps(object)
47+
3648
def decode(self, input: str):
3749
zipped = urlsafe_b64decode(f"{input}{'=' * divmod(len(input),4)[1]}")
3850
self.list = bytearray(zlib.decompress(zipped))

src/status_token.py

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
from jwcrypto import jwk, jwt
2+
from cwt import COSE, COSEKey, CWTClaims, COSEHeaders
23
from status_list import StatusList
34
from datetime import datetime
45
from typing import Dict
6+
from cbor2 import dumps
57
import json
68

79
DEFAULT_ALG = "ES256"
8-
STATUS_LIST_TYP = "statuslist+jwt"
10+
STATUS_LIST_TYP_JWT = "statuslist+jwt"
11+
STATUS_LIST_TYP_CWT = "statuslist+cwt"
912

1013

1114
class StatusListToken:
@@ -45,7 +48,7 @@ def fromJWT(cls, input: str, key: jwk.JWK):
4548
header = json.loads(decoded.header)
4649
alg = header["alg"]
4750
typ = header["typ"]
48-
assert typ == STATUS_LIST_TYP
51+
assert typ == STATUS_LIST_TYP_JWT
4952
claims = json.loads(decoded.claims)
5053
status_list = claims["status_list"]
5154
lst = status_list["lst"]
@@ -88,7 +91,7 @@ def buildJWT(
8891
claims["iat"] = int(iat.timestamp())
8992
if exp is not None:
9093
claims["exp"] = int(exp.timestamp())
91-
claims["status_list"] = self.list.encodeObject()
94+
claims["status_list"] = self.list.encodeAsJSON()
9295

9396
# build header
9497
if optional_header is not None:
@@ -98,8 +101,57 @@ def buildJWT(
98101
if self._key.key_id:
99102
header["kid"] = self._key.key_id
100103
header["alg"] = self._alg
101-
header["typ"] = STATUS_LIST_TYP
104+
header["typ"] = STATUS_LIST_TYP_JWT
102105

103106
token = jwt.JWT(header=header, claims=claims)
104107
token.make_signed_token(self._key)
105108
return token.serialize(compact=compact)
109+
110+
def buildCWT(
111+
self,
112+
iat: datetime = datetime.utcnow(),
113+
exp: datetime = None,
114+
optional_claims: Dict = None,
115+
optional_protected_header: Dict = None,
116+
optional_unprotected_header: Dict = None
117+
) -> bytes:
118+
# build claims
119+
if optional_claims is not None:
120+
claims = optional_claims
121+
else:
122+
claims = {}
123+
claims[CWTClaims.SUB] = self.subject
124+
claims[CWTClaims.ISS] = self.issuer
125+
claims[CWTClaims.IAT] = int(iat.timestamp())
126+
if exp is not None:
127+
claims[CWTClaims.EXP] = int(exp.timestamp())
128+
claims[65534] = self.list.encodeAsCBOR() # no CWT claim key assigned yet by IANA
129+
130+
# build header
131+
if optional_protected_header is not None:
132+
protected_header = optional_protected_header
133+
else:
134+
protected_header = {}
135+
136+
if optional_unprotected_header is not None:
137+
unprotected_header = optional_unprotected_header
138+
else:
139+
unprotected_header = {}
140+
141+
if self._key.key_id:
142+
unprotected_header[COSEHeaders.KID] = self._key.key_id.encode('utf-8')
143+
protected_header[COSEHeaders.ALG] = self._alg
144+
protected_header[16] = STATUS_LIST_TYP_CWT
145+
146+
key = COSEKey.from_jwk(self._key)
147+
148+
# The sender side:
149+
sender = COSE.new()
150+
encoded = sender.encode(
151+
dumps(claims),
152+
key,
153+
protected=protected_header,
154+
unprotected=unprotected_header
155+
)
156+
157+
return encoded

src/util.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from jwcrypto import jwk, jwt
22
from textwrap import fill
33
from typing import Dict
4+
import subprocess
45
import json
56

67
example = {
@@ -40,6 +41,13 @@ def printText(input: str) -> str:
4041
return fill(input, width=MAX_LENGTH, break_on_hyphens=False)
4142

4243

44+
# TODO: find a better way to do create CBOR Diagnostics output
45+
# this is still too wide
46+
def printCBORDiagnostics(input: bytes) -> str:
47+
diag = subprocess.check_output("cborg hex2diag " + input.hex(), shell=True).decode('utf8')
48+
return diag
49+
50+
4351
def outputFile(file_name: str, input: str):
4452
with open(file_name, "w") as file:
4553
file.write(input)

0 commit comments

Comments
 (0)