blob: a29f024656a33abfda3e56c5bf2c6d6129d41740 [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"bytes"
"compress/zlib"
"encoding/json"
"io"
"time"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/gae/service/datastore"
api "go.chromium.org/luci/cipd/api/cipd/v1"
)
// ProcessingResult holds information extracted from the package instance file.
//
// It is obtained during an asynchronous post processing step triggered after
// the instance is uploaded. Immutable.
//
// Entity ID is a processor name used to extract it. Parent entity is
// PackageInstance the information was extracted from.
type ProcessingResult struct {
_kind string `gae:"$kind,ProcessingResult"`
_extra datastore.PropertyMap `gae:"-,extra"`
ProcID string `gae:"$id"` // processor that generated the result
Instance *datastore.Key `gae:"$parent"` // instance it was generated from
CreatedTs time.Time `gae:"created_ts"` // when it was generated
Success bool `gae:"success"` // mostly for for indexing
Error string `gae:"error,noindex"` // for Success == false
ResultRaw []byte `gae:"result,noindex"` // for Success == true
}
// WriteResult overwrites ResultRaw field with compressed JSON-serialized 'r'.
//
// 'r' should serialize into a JSON object, e.g. '{...}'.
func (p *ProcessingResult) WriteResult(r any) error {
blob, err := json.Marshal(r)
switch {
case err != nil:
return errors.Annotate(err, "failed to serialize the result").Err()
case len(blob) == 0 || blob[0] != '{':
return errors.Reason("the result is not a JSON object").Err()
}
out := bytes.Buffer{}
z := zlib.NewWriter(&out)
if _, err := io.Copy(z, bytes.NewReader(blob)); err != nil {
z.Close()
return errors.Annotate(err, "failed to compress the result").Err()
}
if err := z.Close(); err != nil {
return errors.Annotate(err, "failed to close zlib writer").Err()
}
p.ResultRaw = out.Bytes()
return nil
}
// ReadResult deserializes the result into the given variable.
//
// Does nothing if there's no results stored.
func (p *ProcessingResult) ReadResult(r any) error {
if len(p.ResultRaw) == 0 {
return nil
}
z, err := zlib.NewReader(bytes.NewReader(p.ResultRaw))
if err != nil {
return errors.Annotate(err, "failed to open the blob for zlib decompression").Err()
}
if err := json.NewDecoder(z).Decode(r); err != nil {
z.Close()
return errors.Annotate(err, "failed to decompress or deserialize the result").Err()
}
if err := z.Close(); err != nil {
return errors.Annotate(err, "failed to close zlib reader").Err()
}
return nil
}
// ReadResultIntoStruct deserializes the result into the protobuf.Struct.
//
// Does nothing if there's no results stored.
func (p *ProcessingResult) ReadResultIntoStruct(s *structpb.Struct) error {
if len(p.ResultRaw) == 0 {
return nil
}
z, err := zlib.NewReader(bytes.NewReader(p.ResultRaw))
if err != nil {
return errors.Annotate(err, "failed to open the blob for zlib decompression").Err()
}
blob, err := io.ReadAll(z)
if err != nil {
z.Close()
return errors.Annotate(err, "failed to decompress the result").Err()
}
if err := z.Close(); err != nil {
return errors.Annotate(err, "failed to close zlib reader").Err()
}
if err := protojson.Unmarshal(blob, s); err != nil {
return errors.Annotate(err, "failed to deserialize the result").Err()
}
return nil
}
// Proto returns cipd.Processor proto with information from this entity.
func (p *ProcessingResult) Proto() (*api.Processor, error) {
out := &api.Processor{Id: p.ProcID}
if p.CreatedTs.IsZero() {
out.State = api.Processor_PENDING // no result yet
return out, nil
}
out.FinishedTs = timestamppb.New(p.CreatedTs)
if p.Success {
out.State = api.Processor_SUCCEEDED
res := &structpb.Struct{}
if err := p.ReadResultIntoStruct(res); err != nil {
return nil, err
}
if len(res.Fields) != 0 {
out.Result = res
}
} else {
out.State = api.Processor_FAILED
out.Error = p.Error
}
return out, nil
}