您现在的位置是:网站首页 > 博客日记 >

jsonpath.jsonpath函数源码

作者:YXN-python 阅读量:105 发布日期:2023-11-07

源码

import re
import sys


def normalize(x):
    """normalize the path expression; outside jsonpath to allow testing"""
    subx = []

    # replace index/filter expressions with placeholders
    # Python anonymous functions (lambdas) are cryptic, hard to debug
    def f1(m):
        n = len(subx)   # before append
        g1 = m.group(1)
        subx.append(g1)
        ret = "[#%d]" % n
        #       print("f1:", g1, ret)
        return ret
    x = re.sub(r"[\['](\??\(.*?\))[\]']", f1, x)

    # added the negative lookbehind -krhodes
    x = re.sub(r"'?(? 1: print("\tf03", key, loc, expr, path)
                    trace(s(key, expr), obj, path)
                walk(loc, x, obj, path, f03)
            elif loc == "..":
                trace(x, obj, path)
                def f04(key, loc, expr, obj, path):
                    if debug > 1: print("\tf04", key, loc, expr, path)
                    if isinstance(obj, dict):
                        if key in obj:
                            trace(s('..', expr), obj[key], s(path, key))
                    else:
                        if key < len(obj):
                            trace(s('..', expr), obj[key], s(path, key))
                walk(loc, x, obj, path, f04)
            elif loc == "!":
                # Perl jsonpath extension: return keys
                def f06(key, loc, expr, obj, path):
                    if isinstance(obj, dict):
                        trace(expr, key, path)
                walk(loc, x, obj, path, f06)
            elif isinstance(obj, dict) and loc in obj:
                trace(x, obj[loc], s(path, loc))
            elif isinstance(obj, list) and isint(loc):
                iloc = int(loc)
                if debug: print("----->", iloc, len(obj))
                if len(obj) > iloc:
                    trace(x, obj[iloc], s(path, loc))
            else:
                # [(index_expression)]
                if loc.startswith("(") and loc.endswith(")"):
                    if debug > 1: print("index", loc)
                    e = evalx(loc, obj)
                    trace(s(e,x), obj, path)
                    return

                # ?(filter_expression)
                if loc.startswith("?(") and loc.endswith(")"):
                    if debug > 1: print("filter", loc)
                    def f05(key, loc, expr, obj, path):
                        if debug > 1: print("f05", key, loc, expr, path)
                        if isinstance(obj, dict):
                            eval_result = evalx(loc, obj[key])
                        else:
                            eval_result = evalx(loc, obj[int(key)])
                        if eval_result:
                            trace(s(key, expr), obj, path)

                    loc = loc[2:-1]
                    walk(loc, x, obj, path, f05)
                    return

                m = re.match(r'(-?[0-9]*):(-?[0-9]*):?(-?[0-9]*)$', loc)
                if m:
                    if isinstance(obj, (dict, list)):
                        def max(x,y):
                            if x > y:
                                return x
                            return y

                        def min(x,y):
                            if x < y:
                                return x
                            return y

                        objlen = len(obj)
                        s0 = m.group(1)
                        s1 = m.group(2)
                        s2 = m.group(3)

                        # XXX int("badstr") raises exception
                        start = int(s0) if s0 else 0
                        end = int(s1) if s1 else objlen
                        step = int(s2) if s2 else 1

                        if start < 0:
                            start = max(0, start+objlen)
                        else:
                            start = min(objlen, start)
                        if end < 0:
                            end = max(0, end+objlen)
                        else:
                            end = min(objlen, end)

                        for i in xrange(start, end, step):
                            trace(s(i, x), obj, path)
                    return

                # after (expr) & ?(expr)
                if loc.find(",") >= 0:
                    # [index,index....]
                    for piece in re.split(r"'?,'?", loc):
                        if debug > 1: print("piece", piece)
                        trace(s(piece, x), obj, path)
        else:
            store(path, obj)

    def walk(loc, expr, obj, path, funct):
        if isinstance(obj, list):
            for i in xrange(0, len(obj)):
                funct(i, loc, expr, obj, path)
        elif isinstance(obj, dict):
            for key in obj:
                funct(key, loc, expr, obj, path)

    def evalx(loc, obj):
        """eval expression"""

        if debug: print("evalx", loc)

        # a nod to JavaScript. doesn't work for @.name.name.length
        # Write len(@.name.name) instead!!!
        loc = loc.replace("@.length", "len(__obj)")

        loc = loc.replace("&&", " and ").replace("||", " or ")

        # replace !@.name with 'name' not in obj
        # XXX handle !@.name.name.name....
        def notvar(m):
            return "'%s' not in __obj" % m.group(1)
        loc = re.sub(r"!@\.([a-zA-Z@_0-9-]*)", notvar, loc)

        # replace @.name.... with __obj['name']....
        # handle @.name[.name...].length
        def varmatch(m):
            def brackets(elts):
                ret = "__obj"
                for e in elts:
                    if isint(e):
                        ret += "[%s]" % e # ain't necessarily so
                    else:
                        ret += "['%s']" % e # XXX beware quotes!!!!
                return ret
            g1 = m.group(1)
            elts = g1.split('.')
            if elts[-1] == "length":
                return "len(%s)" % brackets(elts[1:-1])
            return brackets(elts[1:])

        loc = re.sub(r'(? == translation
        # causes problems if a string contains =

        # replace @  w/ "__obj", but \@ means a literal @
        loc = re.sub(r'(?", v)
        return v

    # body of jsonpath()

    # Get caller globals so eval can pick up user functions!!!
    caller_globals = sys._getframe(1).f_globals
    result = []
    if expr and obj:
        cleaned_expr = normalize(expr)
        if cleaned_expr.startswith("$;"):
            cleaned_expr = cleaned_expr[2:]

        # XXX wrap this in a try??
        trace(cleaned_expr, obj, '$')

        if len(result) > 0:
            return result
    return False

语法规则参考:python之jsonpath库

自己写一个

最基本的

仅支持简单语法:

$.data[*].a.aa[*].aaa1

 

def jsonpath1(data, query):
    parts = query.split('.')

    def helper(cur, part_index):
        if part_index >= len(parts):
            return []
        part = parts[part_index]
        if part.startswith('$'):
            return helper(cur, part_index + 1)
        elif part.endswith(']'):
            arr_start = part.find('[')
            key = part[:arr_start]
            sub = cur.get(key)
            if sub is None:
                return []
            index_spec = part[arr_start + 1: -1]
            if index_spec == '*':
                sub_results = []
                for i in range(len(sub)):
                    sub_item = sub[i]
                    sub_result = helper(sub_item, part_index + 1)
                    if isinstance(sub_result, list):
                        sub_results.extend(sub_result)
                    else:
                        sub_results.append(sub_result)
                return sub_results
            else:
                index = int(index_spec)
                if index < len(sub):
                    return helper(sub[index], part_index + 1)
                else:
                    return []
        else:
            sub = cur.get(part)
            if sub is None:
                return []
            if part_index == len(parts) - 1:
                return sub
            else:
                return helper(sub, part_index + 1)

    return helper(data, 0)


if __name__ == '__main__':
    data_dict = {
        'data': [
            {'a': {'aa': [{'aaa1': 'aaaa1'}, {'aaa1': 'aaaa2'}]}},
            {'a': {'aa': [{'aaa1': 'aaaa3'}, {'aaa2': 'aaa'}]}},
            {'b': ['bb1', 'bb2']}
        ]
    }

    path = '$.data[*].a.aa[*].aaa1'
    result = jsonpath1(data_dict, path)
    print(result)  # 输出: ['aaaa1', 'aaaa2', 'aaaa3']

支持区分数组中的多个值

# 支持区分 数组元素中的多个值
def jsonpath2(data, query):
    parts = query.split('.')

    def helper(cur, part_index):
        if part_index >= len(parts):
            return []
        part = parts[part_index]
        if part.startswith('$'):
            return helper(cur, part_index + 1)
        elif part.endswith(']'):
            arr_start = part.find('[')
            key = part[:arr_start]
            sub = cur.get(key)
            if sub is None:
                return []
            index_spec = part[arr_start + 1: -1]
            if index_spec == '*':
                sub_results = []
                sub_list = []
                for i in range(len(sub)):
                    sub_item = sub[i]
                    sub_result = helper(sub_item, part_index + 1)
                    if isinstance(sub_result, list):
                        sub_list.extend(sub_result)
                    else:
                        sub_list.append(sub_result)
                if part_index < len(parts) - 1:
                    sub_results.append(sub_list)
                else:
                    sub_results.extend(sub_list)
                return sub_results
            else:
                index = int(index_spec)
                if index < len(sub):
                    return helper(sub[index], part_index + 1)
                else:
                    return []
        else:
            sub = cur.get(part)
            if sub is None:
                return []
            if part_index == len(parts) - 1:
                return sub
            else:
                return helper(sub, part_index + 1)

    result = helper(data, 0)

    def post_process(lst):
        new_lst = []
        for item in lst:
            if isinstance(item, list):
                item = post_process(item)
                if len(item) == 1 and isinstance(item[0], list):
                    item = item[0]
            new_lst.append(item)
        # 再次检查最外层列表
        if len(new_lst) == 1 and isinstance(new_lst[0], list):
            new_lst = new_lst[0]
        return new_lst

    if isinstance(result, list):
        return post_process(result)
    return result


if __name__ == '__main__':
    data_dict = {
        'data': [
            {'a': {'aa': [{'aaa1': 'aaaa1'}, {'aaa1': 'aaaa2'}]}},
            {'a': {'aa': [{'aaa1': 'aaaa3'}, {'aaa2': 'aaa'}]}},
            {'b': ['bb1', 'bb2']}
        ]
    }

    path = '$.data[*].a.aa[*].aaa1'
    result = jsonpath2(data_dict, path)
    print(result)  # 输出: [['aaaa1', 'aaaa2'], ['aaaa3']]

 

YXN-python

2023-11-07