From 4c37231af20ebfd72737ac3ade174995f58fe4ce Mon Sep 17 00:00:00 2001 From: wangxinbiao <1412146116@qq.com> Date: Thu, 21 Mar 2024 15:24:51 +0800 Subject: [PATCH] feat:the time consumed for data processing. --- apiserver/graph/generated/generated.go | 525 +++++++++++++++++- apiserver/graph/generated/models_gen.go | 13 +- apiserver/graph/schema/dataprocessing.gql | 8 + .../graph/schema/dataprocessing.graphqls | 13 + .../templates/pg-init-data-configmap.yaml | 2 + .../db-scripts/change/20240319-schema.sql | 3 + pypi/data-processing/requirements.txt | 1 + .../data_process_db_operate.py | 1 + .../src/service/data_process_service.py | 20 + 9 files changed, 584 insertions(+), 2 deletions(-) create mode 100644 pypi/data-processing/db-scripts/change/20240319-schema.sql diff --git a/apiserver/graph/generated/generated.go b/apiserver/graph/generated/generated.go index acbc16e7d..c841824ab 100644 --- a/apiserver/graph/generated/generated.go +++ b/apiserver/graph/generated/generated.go @@ -195,6 +195,7 @@ type ComplexityRoot struct { DataProcessConfigInfo func(childComplexity int) int EndTime func(childComplexity int) int ErrorMsg func(childComplexity int) int + FileDetails func(childComplexity int) int FileNum func(childComplexity int) int FileType func(childComplexity int) int ID func(childComplexity int) int @@ -208,6 +209,7 @@ type ComplexityRoot struct { } DataProcessItem struct { + EndDatetime func(childComplexity int) int ErrorMsg func(childComplexity int) int ID func(childComplexity int) int Name func(childComplexity int) int @@ -362,6 +364,14 @@ type ComplexityRoot struct { Time func(childComplexity int) int } + FileDetails struct { + EndTime func(childComplexity int) int + FileName func(childComplexity int) int + FileSize func(childComplexity int) int + StartTime func(childComplexity int) int + Status func(childComplexity int) int + } + GPT struct { Category func(childComplexity int) int Creator func(childComplexity int) int @@ -1581,6 +1591,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.DataProcessDetailsItem.ErrorMsg(childComplexity), true + case "DataProcessDetailsItem.file_details": + if e.complexity.DataProcessDetailsItem.FileDetails == nil { + break + } + + return e.complexity.DataProcessDetailsItem.FileDetails(childComplexity), true + case "DataProcessDetailsItem.file_num": if e.complexity.DataProcessDetailsItem.FileNum == nil { break @@ -1651,6 +1668,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.DataProcessDetailsItem.Status(childComplexity), true + case "DataProcessItem.end_datetime": + if e.complexity.DataProcessItem.EndDatetime == nil { + break + } + + return e.complexity.DataProcessItem.EndDatetime(childComplexity), true + case "DataProcessItem.error_msg": if e.complexity.DataProcessItem.ErrorMsg == nil { break @@ -2488,6 +2512,41 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.F.Time(childComplexity), true + case "FileDetails.end_time": + if e.complexity.FileDetails.EndTime == nil { + break + } + + return e.complexity.FileDetails.EndTime(childComplexity), true + + case "FileDetails.file_name": + if e.complexity.FileDetails.FileName == nil { + break + } + + return e.complexity.FileDetails.FileName(childComplexity), true + + case "FileDetails.file_size": + if e.complexity.FileDetails.FileSize == nil { + break + } + + return e.complexity.FileDetails.FileSize(childComplexity), true + + case "FileDetails.start_time": + if e.complexity.FileDetails.StartTime == nil { + break + } + + return e.complexity.FileDetails.StartTime(childComplexity), true + + case "FileDetails.status": + if e.complexity.FileDetails.Status == nil { + break + } + + return e.complexity.FileDetails.Status(childComplexity), true + case "GPT.category": if e.complexity.GPT.Category == nil { break @@ -5313,6 +5372,7 @@ input AddDataProcessInput { # 文件条目 input FileItem { name: String! + size: String } # 数据处理配置条目 @@ -5400,6 +5460,8 @@ type DataProcessItem { post_data_set_version: String # 开始时间 start_datetime: String! + # 结束时间 + end_datetime: String! # 错误日志 error_msg: String } @@ -5458,6 +5520,7 @@ type DataProcessDetailsItem { error_msg: String data_process_config_info: [DataProcessConfigInfo!] config: [DataProcessConfig!] + file_details: [FileDetails!] } type DataProcessConfigInfo { @@ -5483,6 +5546,15 @@ type DataProcessConfig { children: [DataProcessConfigChildren] } +# 文件处理详情 +type FileDetails { + file_name: String! + status: String! + start_time: String! + end_time: String! + file_size: String! +} + # 数据处理配置项子项 type DataProcessConfigChildren { name: String @@ -12762,6 +12834,8 @@ func (ec *executionContext) fieldContext_DataProcessDetails_data(ctx context.Con return ec.fieldContext_DataProcessDetailsItem_data_process_config_info(ctx, field) case "config": return ec.fieldContext_DataProcessDetailsItem_config(ctx, field) + case "file_details": + return ec.fieldContext_DataProcessDetailsItem_file_details(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type DataProcessDetailsItem", field.Name) }, @@ -13484,6 +13558,59 @@ func (ec *executionContext) fieldContext_DataProcessDetailsItem_config(ctx conte return fc, nil } +func (ec *executionContext) _DataProcessDetailsItem_file_details(ctx context.Context, field graphql.CollectedField, obj *DataProcessDetailsItem) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_DataProcessDetailsItem_file_details(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.FileDetails, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.([]*FileDetails) + fc.Result = res + return ec.marshalOFileDetails2ᚕᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐFileDetailsᚄ(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_DataProcessDetailsItem_file_details(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "DataProcessDetailsItem", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "file_name": + return ec.fieldContext_FileDetails_file_name(ctx, field) + case "status": + return ec.fieldContext_FileDetails_status(ctx, field) + case "start_time": + return ec.fieldContext_FileDetails_start_time(ctx, field) + case "end_time": + return ec.fieldContext_FileDetails_end_time(ctx, field) + case "file_size": + return ec.fieldContext_FileDetails_file_size(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type FileDetails", field.Name) + }, + } + return fc, nil +} + func (ec *executionContext) _DataProcessItem_id(ctx context.Context, field graphql.CollectedField, obj *DataProcessItem) (ret graphql.Marshaler) { fc, err := ec.fieldContext_DataProcessItem_id(ctx, field) if err != nil { @@ -13833,6 +13960,50 @@ func (ec *executionContext) fieldContext_DataProcessItem_start_datetime(ctx cont return fc, nil } +func (ec *executionContext) _DataProcessItem_end_datetime(ctx context.Context, field graphql.CollectedField, obj *DataProcessItem) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_DataProcessItem_end_datetime(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.EndDatetime, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_DataProcessItem_end_datetime(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "DataProcessItem", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _DataProcessItem_error_msg(ctx context.Context, field graphql.CollectedField, obj *DataProcessItem) (ret graphql.Marshaler) { fc, err := ec.fieldContext_DataProcessItem_error_msg(ctx, field) if err != nil { @@ -18532,6 +18703,226 @@ func (ec *executionContext) fieldContext_F_creationTimestamp(ctx context.Context return fc, nil } +func (ec *executionContext) _FileDetails_file_name(ctx context.Context, field graphql.CollectedField, obj *FileDetails) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_FileDetails_file_name(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.FileName, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_FileDetails_file_name(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "FileDetails", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _FileDetails_status(ctx context.Context, field graphql.CollectedField, obj *FileDetails) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_FileDetails_status(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Status, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_FileDetails_status(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "FileDetails", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _FileDetails_start_time(ctx context.Context, field graphql.CollectedField, obj *FileDetails) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_FileDetails_start_time(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.StartTime, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_FileDetails_start_time(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "FileDetails", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _FileDetails_end_time(ctx context.Context, field graphql.CollectedField, obj *FileDetails) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_FileDetails_end_time(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.EndTime, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_FileDetails_end_time(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "FileDetails", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _FileDetails_file_size(ctx context.Context, field graphql.CollectedField, obj *FileDetails) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_FileDetails_file_size(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.FileSize, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_FileDetails_file_size(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "FileDetails", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _GPT_name(ctx context.Context, field graphql.CollectedField, obj *Gpt) (ret graphql.Marshaler) { fc, err := ec.fieldContext_GPT_name(ctx, field) if err != nil { @@ -25052,6 +25443,8 @@ func (ec *executionContext) fieldContext_PaginatedDataProcessItem_data(ctx conte return ec.fieldContext_DataProcessItem_post_data_set_version(ctx, field) case "start_datetime": return ec.fieldContext_DataProcessItem_start_datetime(ctx, field) + case "end_datetime": + return ec.fieldContext_DataProcessItem_end_datetime(ctx, field) case "error_msg": return ec.fieldContext_DataProcessItem_error_msg(ctx, field) } @@ -35896,7 +36289,7 @@ func (ec *executionContext) unmarshalInputFileItem(ctx context.Context, obj inte asMap[k] = v } - fieldsInOrder := [...]string{"name"} + fieldsInOrder := [...]string{"name", "size"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -35910,6 +36303,13 @@ func (ec *executionContext) unmarshalInputFileItem(ctx context.Context, obj inte return it, err } it.Name = data + case "size": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("size")) + data, err := ec.unmarshalOString2ᚖstring(ctx, v) + if err != nil { + return it, err + } + it.Size = data } } @@ -39409,6 +39809,8 @@ func (ec *executionContext) _DataProcessDetailsItem(ctx context.Context, sel ast out.Values[i] = ec._DataProcessDetailsItem_data_process_config_info(ctx, field, obj) case "config": out.Values[i] = ec._DataProcessDetailsItem_config(ctx, field, obj) + case "file_details": + out.Values[i] = ec._DataProcessDetailsItem_file_details(ctx, field, obj) default: panic("unknown field " + strconv.Quote(field.Name)) } @@ -39480,6 +39882,11 @@ func (ec *executionContext) _DataProcessItem(ctx context.Context, sel ast.Select if out.Values[i] == graphql.Null { out.Invalids++ } + case "end_datetime": + out.Values[i] = ec._DataProcessItem_end_datetime(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } case "error_msg": out.Values[i] = ec._DataProcessItem_error_msg(ctx, field, obj) default: @@ -41212,6 +41619,65 @@ func (ec *executionContext) _F(ctx context.Context, sel ast.SelectionSet, obj *F return out } +var fileDetailsImplementors = []string{"FileDetails"} + +func (ec *executionContext) _FileDetails(ctx context.Context, sel ast.SelectionSet, obj *FileDetails) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, fileDetailsImplementors) + + out := graphql.NewFieldSet(fields) + deferred := make(map[string]*graphql.FieldSet) + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("FileDetails") + case "file_name": + out.Values[i] = ec._FileDetails_file_name(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "status": + out.Values[i] = ec._FileDetails_status(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "start_time": + out.Values[i] = ec._FileDetails_start_time(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "end_time": + out.Values[i] = ec._FileDetails_end_time(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "file_size": + out.Values[i] = ec._FileDetails_file_size(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch(ctx) + if out.Invalids > 0 { + return graphql.Null + } + + atomic.AddInt32(&ec.deferred, int32(len(deferred))) + + for label, dfs := range deferred { + ec.processDeferredGroup(graphql.DeferredGroup{ + Label: label, + Path: graphql.GetPath(ctx), + FieldSet: dfs, + Context: ctx, + }) + } + + return out +} + var gPTImplementors = []string{"GPT", "PageNode"} func (ec *executionContext) _GPT(ctx context.Context, sel ast.SelectionSet, obj *Gpt) graphql.Marshaler { @@ -45916,6 +46382,16 @@ func (ec *executionContext) marshalNF2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapis return ec._F(ctx, sel, v) } +func (ec *executionContext) marshalNFileDetails2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐFileDetails(ctx context.Context, sel ast.SelectionSet, v *FileDetails) graphql.Marshaler { + if v == nil { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + return graphql.Null + } + return ec._FileDetails(ctx, sel, v) +} + func (ec *executionContext) unmarshalNFileGroup2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐFileGroup(ctx context.Context, v interface{}) (*FileGroup, error) { res, err := ec.unmarshalInputFileGroup(ctx, v) return &res, graphql.ErrorOnPath(ctx, err) @@ -47361,6 +47837,53 @@ func (ec *executionContext) marshalOF2ᚕᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋa return ret } +func (ec *executionContext) marshalOFileDetails2ᚕᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐFileDetailsᚄ(ctx context.Context, sel ast.SelectionSet, v []*FileDetails) graphql.Marshaler { + if v == nil { + return graphql.Null + } + ret := make(graphql.Array, len(v)) + var wg sync.WaitGroup + isLen1 := len(v) == 1 + if !isLen1 { + wg.Add(len(v)) + } + for i := range v { + i := i + fc := &graphql.FieldContext{ + Index: &i, + Result: &v[i], + } + ctx := graphql.WithFieldContext(ctx, fc) + f := func(i int) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + if !isLen1 { + defer wg.Done() + } + ret[i] = ec.marshalNFileDetails2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐFileDetails(ctx, sel, v[i]) + } + if isLen1 { + f(i) + } else { + go f(i) + } + + } + wg.Wait() + + for _, e := range ret { + if e == graphql.Null { + return graphql.Null + } + } + + return ret +} + func (ec *executionContext) unmarshalOFileFilter2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐFileFilter(ctx context.Context, v interface{}) (*FileFilter, error) { if v == nil { return nil, nil diff --git a/apiserver/graph/generated/models_gen.go b/apiserver/graph/generated/models_gen.go index 745c17496..b942b110a 100644 --- a/apiserver/graph/generated/models_gen.go +++ b/apiserver/graph/generated/models_gen.go @@ -497,6 +497,7 @@ type DataProcessDetailsItem struct { ErrorMsg *string `json:"error_msg,omitempty"` DataProcessConfigInfo []*DataProcessConfigInfo `json:"data_process_config_info,omitempty"` Config []*DataProcessConfig `json:"config,omitempty"` + FileDetails []*FileDetails `json:"file_details,omitempty"` } type DataProcessFileLogInput struct { @@ -514,6 +515,7 @@ type DataProcessItem struct { PostDataSetName string `json:"post_data_set_name"` PostDataSetVersion *string `json:"post_data_set_version,omitempty"` StartDatetime string `json:"start_datetime"` + EndDatetime string `json:"end_datetime"` ErrorMsg *string `json:"error_msg,omitempty"` } @@ -798,6 +800,14 @@ type F struct { func (F) IsPageNode() {} +type FileDetails struct { + FileName string `json:"file_name"` + Status string `json:"status"` + StartTime string `json:"start_time"` + EndTime string `json:"end_time"` + FileSize string `json:"file_size"` +} + // 根据条件顾虑版本内的文件,只支持关键词搜索 type FileFilter struct { // 根据关键词搜索文件,strings.Container(fileName, keyword) @@ -818,7 +828,8 @@ type FileGroup struct { } type FileItem struct { - Name string `json:"name"` + Name string `json:"name"` + Size *string `json:"size,omitempty"` } // GPT diff --git a/apiserver/graph/schema/dataprocessing.gql b/apiserver/graph/schema/dataprocessing.gql index f9a4115c5..01bde5313 100644 --- a/apiserver/graph/schema/dataprocessing.gql +++ b/apiserver/graph/schema/dataprocessing.gql @@ -11,6 +11,7 @@ query allDataProcessListByPage($input: AllDataProcessListByPageInput!){ post_data_set_name post_data_set_version start_datetime + end_datetime error_msg } message @@ -124,6 +125,13 @@ query dataProcessDetails($input: DataProcessDetailsInput){ } } } + file_details { + file_name + status + start_time + end_time + file_size + } } message } diff --git a/apiserver/graph/schema/dataprocessing.graphqls b/apiserver/graph/schema/dataprocessing.graphqls index 4ccc5fa79..548c0757e 100644 --- a/apiserver/graph/schema/dataprocessing.graphqls +++ b/apiserver/graph/schema/dataprocessing.graphqls @@ -57,6 +57,7 @@ input AddDataProcessInput { # 文件条目 input FileItem { name: String! + size: String } # 数据处理配置条目 @@ -144,6 +145,8 @@ type DataProcessItem { post_data_set_version: String # 开始时间 start_datetime: String! + # 结束时间 + end_datetime: String! # 错误日志 error_msg: String } @@ -202,6 +205,7 @@ type DataProcessDetailsItem { error_msg: String data_process_config_info: [DataProcessConfigInfo!] config: [DataProcessConfig!] + file_details: [FileDetails!] } type DataProcessConfigInfo { @@ -227,6 +231,15 @@ type DataProcessConfig { children: [DataProcessConfigChildren] } +# 文件处理详情 +type FileDetails { + file_name: String! + status: String! + start_time: String! + end_time: String! + file_size: String! +} + # 数据处理配置项子项 type DataProcessConfigChildren { name: String diff --git a/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml b/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml index c17fdcf19..ceaf5f7e4 100644 --- a/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml +++ b/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml @@ -275,6 +275,7 @@ data: end_datetime character varying(64) COLLATE pg_catalog."default", status character varying(64) COLLATE pg_catalog."default", error_msg text COLLATE pg_catalog."default", + exc_msg text COLLATE pg_catalog."default", create_datetime character varying(64) COLLATE pg_catalog."default", create_user character varying(64) COLLATE pg_catalog."default", create_program character varying(64) COLLATE pg_catalog."default", @@ -291,6 +292,7 @@ data: COMMENT ON COLUMN public.data_process_task_log.end_datetime IS '结束时间'; COMMENT ON COLUMN public.data_process_task_log.status IS '状态'; COMMENT ON COLUMN public.data_process_task_log.error_msg IS '错误信息'; + COMMENT ON COLUMN public.data_process_task_log.exc_msg IS '异常信息'; COMMENT ON COLUMN public.data_process_task_log.create_datetime IS '创建时间'; COMMENT ON COLUMN public.data_process_task_log.create_user IS '创建人'; COMMENT ON COLUMN public.data_process_task_log.create_program IS '创建程序'; diff --git a/pypi/data-processing/db-scripts/change/20240319-schema.sql b/pypi/data-processing/db-scripts/change/20240319-schema.sql new file mode 100644 index 000000000..97956f8c3 --- /dev/null +++ b/pypi/data-processing/db-scripts/change/20240319-schema.sql @@ -0,0 +1,3 @@ +ALTER TABLE data_process_task_log ADD COLUMN exc_msg text; + +COMMENT ON COLUMN data_process_task_log.exc_msg IS '异常信息'; \ No newline at end of file diff --git a/pypi/data-processing/requirements.txt b/pypi/data-processing/requirements.txt index 5e10b699d..b562fa98b 100644 --- a/pypi/data-processing/requirements.txt +++ b/pypi/data-processing/requirements.txt @@ -27,3 +27,4 @@ bs4==0.0.1 playwright==1.40.0 pillow==10.2.0 html2text==2020.1.16 +cryptography==42.0.5 \ No newline at end of file diff --git a/pypi/data-processing/src/database_operate/data_process_db_operate.py b/pypi/data-processing/src/database_operate/data_process_db_operate.py index c6775bd4b..e24c33757 100644 --- a/pypi/data-processing/src/database_operate/data_process_db_operate.py +++ b/pypi/data-processing/src/database_operate/data_process_db_operate.py @@ -37,6 +37,7 @@ def list_by_page(req_json, pool): dpt.post_data_set_name, dpt.post_data_set_version, dpt.start_datetime, + dpt.end_datetime, dptl.error_msg from public.data_process_task dpt diff --git a/pypi/data-processing/src/service/data_process_service.py b/pypi/data-processing/src/service/data_process_service.py index 048a54d35..95ecc724e 100644 --- a/pypi/data-processing/src/service/data_process_service.py +++ b/pypi/data-processing/src/service/data_process_service.py @@ -154,6 +154,10 @@ def info_by_id(req_json, pool): data["config"] = config_list_for_result + data["file_details"] = _set_file_status( + data.get("file_names"), task_id=id, conn_pool=pool + ) + logger.debug(f"{log_tag_const.DATA_PROCESS_DETAIL} The response data is: \n{data}") return {"status": 200, "message": "", "data": data} @@ -265,6 +269,7 @@ def _get_and_set_basic_detail_info(from_result, task_id, conn_pool): from_result["name"] = detail_info_data["name"] from_result["status"] = detail_info_data["status"] from_result["file_type"] = detail_info_data["file_type"] + from_result["file_names"] = detail_info_data["file_names"] from_result["file_num"] = file_num from_result["pre_dataset_name"] = detail_info_data["pre_data_set_name"] from_result["pre_dataset_version"] = detail_info_data["pre_data_set_version"] @@ -763,3 +768,18 @@ def _get_document_chunk_preview(task_id, conn_pool): ) return chunk_list_preview + + +def _set_file_status(file_names, task_id, conn_pool): + detail_info_params = {"task_id": task_id} + res = data_process_document_db_operate.list_file_by_task_id( + detail_info_params, pool=conn_pool + ) + + documents = res.get("data") + for document in documents: + for item in file_names: + if item.get("name") == document.get("file_name"): + document["file_size"] = item.get("size") + + return documents