#!/usr/bin/env python3
# /h/ was here
import builtins
import io
import pickle
import collections
import torch
import os
import numpy
import _codecs

def encode(*args):
    out = _codecs.encode(*args)
    print(f'encode({args}) = {out}')
    return out

class RestrictedUnpickler(pickle.Unpickler):
    def persistent_load(self, saved_id):
        assert saved_id[0] == 'storage'
        return torch.storage._TypedStorage()

    def find_class(self, module, name):
        print(f'find class {module} {name}')
        if module == 'collections' and name == 'OrderedDict':
            return getattr(collections, name)
        if module == 'torch._utils' and name == '_rebuild_tensor_v2':
            return torch._utils._rebuild_tensor_v2
        if module == 'torch' and name in ['FloatStorage', 'HalfStorage']:
            return torch.FloatStorage
        if module == 'numpy.core.multiarray' and name == 'scalar':
            return numpy.core.multiarray.scalar
        if module == 'numpy' and name == 'dtype':
            return numpy.dtype
        if module == '_codecs' and name == 'encode':
            return encode
        # Forbid everything else.
        raise pickle.UnpicklingError("global '%s/%s' is forbidden" %
                                     (module, name))

def restricted_loads(s):
    """Helper function analogous to pickle.loads()."""
    return RestrictedUnpickler(io.BytesIO(s)).load()

# To test that it catches this RCE:
# restricted_loads(b"cos\nsystem\n(S'echo hello world'\ntR.")

# unzip model.ckpt archive/data.pkl
with open('archive/data.pkl', 'rb') as f: st = f.read()
d = restricted_loads(st)
print(dir(d))
print(d.keys())
print(d['callbacks'])
Edit
Pub: 15 Sep 2022 00:22 UTC
Edit: 15 Sep 2022 00:26 UTC
Views: 6064