// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
	"context"
	"fmt"
	"mime"
	"net/http"
	"os"
	"strconv"
	"sync"
	"sync/atomic"

	"go.chromium.org/luci/common/errors"
	"go.chromium.org/luci/common/sync/dispatcher"
	"go.chromium.org/luci/common/sync/dispatcher/buffer"

	"go.chromium.org/luci/resultdb/pbutil"
	pb "go.chromium.org/luci/resultdb/proto/v1"
	sinkpb "go.chromium.org/luci/resultdb/sink/proto/v1"
)

type uploadTask struct {
	art        *sinkpb.Artifact
	artName    string
	size       int64 // content size
	testStatus pb.TestStatus
}

// newUploadTask constructs an uploadTask for the artifact.
//
// If FilePath is set on the artifact, this calls os.Stat to obtain the file information,
// and may return an error if the Stat call fails. e.g., permission denied, not found.
// It also returns an error if the artifact file path is a directory.
func newUploadTask(name string, art *sinkpb.Artifact, testStatus pb.TestStatus) (*uploadTask, error) {
	ret := &uploadTask{
		art:        art,
		artName:    name,
		size:       int64(len(art.GetContents())),
		testStatus: testStatus,
	}

	// Find and save the content size on uploadTask creation, so that the task scheduling
	// and processing logic can use the size information w/o issuing system calls.
	if fp := art.GetFilePath(); fp != "" {
		st, err := os.Stat(fp)
		switch {
		case err != nil:
			return nil, errors.Annotate(err, "querying file info").Err()
		case st.Mode().IsRegular():
			// break

		// Return a more human friendly error than 1000....0.
		case st.IsDir():
			return nil, errors.Reason("%q is a directory", fp).Err()
		default:
			return nil, errors.Reason("%q is not a regular file: %s", fp, strconv.FormatInt(int64(st.Mode()), 2)).Err()
		}
		ret.size = st.Size()
	}
	return ret, nil
}

// CreateRequest returns a CreateArtifactRequest for the upload task.
//
// Note that this will open and read content from the file, the artifact is set with
// Artifact_FilePath. Save the returned request to avoid unnecessary I/Os,
// if necessary.
func (t *uploadTask) CreateRequest() (*pb.CreateArtifactRequest, error) {
	invID, tID, rID, aID, err := pbutil.ParseArtifactName(t.artName)
	req := &pb.CreateArtifactRequest{
		Artifact: &pb.Artifact{
			ArtifactId:  aID,
			ContentType: t.art.GetContentType(),
			SizeBytes:   t.size,
			Contents:    t.art.GetContents(),
			GcsUri:      t.art.GetGcsUri(),
			TestStatus:  t.testStatus,
		},
	}

	// parent
	switch {
	case err != nil:
		// This should not happen.
		// uploadTask should be created with validated artifacts only.
		panic(fmt.Sprintf("invalid uploadTask.artName %q: %s", t.artName, err))
	case tID == "":
		// Invocation-level artifact
		req.Parent = pbutil.InvocationName(invID)
	default:
		req.Parent = pbutil.TestResultName(invID, tID, rID)
	}

	// contents
	if fp := t.art.GetFilePath(); fp != "" {
		if req.Artifact.Contents, err = os.ReadFile(fp); err != nil {
			return nil, err
		}
	}

	// Update the content type if it is missing
	req.Artifact.ContentType = artifactContentType(req.Artifact.ContentType, req.Artifact.Contents)

	// Perform size check only for non gcs artifact.
	if req.Artifact.GcsUri == "" {
		// If the size of the read content is different to what stat claimed initially, then
		// return an error, so that the batching logic can be kept simple. Test frameworks
		// should send finalized artifacts only.
		if int64(len(req.Artifact.Contents)) != t.size {
			return nil, errors.Reason(
				"the size of the artifact contents changed from %d to %d",
				t.size, len(req.Artifact.Contents)).Err()
		}
	}

	return req, nil
}

type artifactChannel struct {
	// batchChannel uploads artifacts via pb.BatchCreateArtifacts().
	//
	// This batches input artifacts and uploads them all at once.
	// This is suitable for uploading a large number of small artifacts.
	//
	// The downside of this channel is that there is a limit on the maximum size of
	// an artifact that can be included in a batch. Use streamChannel for artifacts
	// greater than ServerConfig.MaxBatchableArtifactSize.
	batchChannel dispatcher.Channel[any]

	// streamChannel uploads artifacts in a streaming manner via HTTP.
	//
	// This is suitable for uploading large files, but with limited parallelism.
	// Use batchChannel, if possible.
	streamChannel dispatcher.Channel[any]

	// wgActive indicates if there are active goroutines invoking reportTestResults.
	//
	// reportTestResults can be invoked by multiple goroutines in parallel. wgActive is used
	// to ensure that all active goroutines finish enqueuing messages to the channel before
	// closeAndDrain closes and drains the channel.
	wgActive sync.WaitGroup

	// 1 indicates that artifactChannel started the process of closing and draining
	// the channel. 0, otherwise.
	closed int32

	cfg *ServerConfig
}

func newArtifactChannel(ctx context.Context, cfg *ServerConfig) *artifactChannel {
	var err error
	c := &artifactChannel{cfg: cfg}
	au := artifactUploader{
		MaxBatchable: cfg.MaxBatchableArtifactSize,
		Recorder:     cfg.Recorder,
		StreamClient: cfg.ArtifactStreamClient,
		StreamHost:   cfg.ArtifactStreamHost,
	}

	// batchChannel
	bcOpts := &dispatcher.Options{
		Buffer: buffer.Options{
			// BatchCreateArtifactRequest can include up to 500 requests and at most 10MiB
			// of artifact contents. uploadTaskSlicer slices tasks, as the number of size
			// limits apply.
			//
			// It's recommended to keep BatchItemsMax >= 500 to increase the chance of
			// BatchCreateArtifactRequest to contain 500 artifacts.
			//
			// Depending on the estimated pattern of artifact size distribution, consider
			// to tune ServerConfig.MaxBatchableArtifactSize and BatchDuration to find
			// the optimal point between artifact upload latency and throughput.
			//
			// For more details, visit
			// https://godoc.org/go.chromium.org/luci/resultdb/proto/v1#BatchCreateArtifactsRequest
			BatchItemsMax: 500,
			MaxLeases:     int(cfg.ArtChannelMaxLeases),
			FullBehavior:  &buffer.BlockNewItems{MaxItems: 8000},
		},
	}
	c.batchChannel, err = dispatcher.NewChannel[any](ctx, bcOpts, func(b *buffer.Batch) error {
		return errors.Annotate(au.BatchUpload(ctx, b), "BatchUpload").Err()
	})
	if err != nil {
		panic(fmt.Sprintf("failed to create batch channel for artifacts: %s", err))
	}

	// streamChannel
	stOpts := &dispatcher.Options{
		Buffer: buffer.Options{
			// BatchItemsMax MUST be 1.
			BatchItemsMax: 1,
			MaxLeases:     int(cfg.ArtChannelMaxLeases),
			FullBehavior:  &buffer.BlockNewItems{MaxItems: 4000},
		},
	}
	c.streamChannel, err = dispatcher.NewChannel[any](ctx, stOpts, func(b *buffer.Batch) error {
		return errors.Annotate(
			au.StreamUpload(ctx, b.Data[0].Item.(*uploadTask), cfg.UpdateToken),
			"StreamUpload").Err()
	})
	if err != nil {
		panic(fmt.Sprintf("failed to create stream channel for artifacts: %s", err))
	}
	return c
}

func (c *artifactChannel) closeAndDrain(ctx context.Context) {
	// mark the channel as closed, so that schedule() won't accept new tasks.
	if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
		return
	}
	// wait for all the active sessions to finish enquing tests results to the channel
	c.wgActive.Wait()

	var draining sync.WaitGroup
	draining.Add(2)
	go func() {
		defer draining.Done()
		c.batchChannel.CloseAndDrain(ctx)
	}()
	go func() {
		defer draining.Done()
		c.streamChannel.CloseAndDrain(ctx)
	}()
	draining.Wait()
}

func (c *artifactChannel) schedule(tasks ...*uploadTask) {
	c.wgActive.Add(1)
	defer c.wgActive.Done()
	// if the channel already has been closed, drop the test results.
	if atomic.LoadInt32(&c.closed) == 1 {
		return
	}

	for _, task := range tasks {
		if task.size > c.cfg.MaxBatchableArtifactSize {
			c.streamChannel.C <- task
		} else {
			c.batchChannel.C <- task
		}
	}
}

// artifactContentType gets the MIME media type by looking at the content.
// It considers at most the first 512 bytes of content.
// If the contentType is already present, it returns that instead.
func artifactContentType(contentType string, contents []byte) string {
	if len(contentType) != 0 || len(contents) == 0 {
		return contentType
	}

	mediaType, _, err := mime.ParseMediaType(http.DetectContentType(contents))
	if err != nil {
		return ""
	}

	return mediaType
}
