diff --git a/cmd/fdsn-quake-consumer/main.go b/cmd/fdsn-quake-consumer/main.go index a815298f..5d69b249 100644 --- a/cmd/fdsn-quake-consumer/main.go +++ b/cmd/fdsn-quake-consumer/main.go @@ -25,11 +25,12 @@ import ( ) const ( - healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time - healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting - healthCheckTimeout = 30 * time.Second //health check timeout - healthCheckService = ":7777" //end point to listen to for SOH checks - healthCheckPath = "/soh" + sqs_visibility_timeout = 60 //seconds + healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time + healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting + healthCheckTimeout = 30 * time.Second //health check timeout + healthCheckService = ":7777" //end point to listen to for SOH checks + healthCheckPath = "/soh" ) var ( @@ -114,7 +115,7 @@ loop1: for { health.Ok() // update soh - r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600) + r, err = sqsClient.ReceiveWithContext(ctx, queueURL, sqs_visibility_timeout) if err != nil { switch { case sqs.IsNoMessagesError(err): @@ -132,6 +133,9 @@ loop1: err = metrics.DoProcess(&n, []byte(r.Body)) if err != nil { log.Printf("problem processing message, skipping deletion for redelivery: %s", err) + if err1 := sqsClient.SetMessageVisibility(queueURL, r.ReceiptHandle, 0); err1 != nil { + log.Printf("error changing message visibility: %s", err1.Error()) + } continue } diff --git a/go.mod b/go.mod index 76abbbd5..7408e025 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/GeoNet/fdsn go 1.23 require ( - github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c + github.com/GeoNet/kit v0.0.0-20250803205759-df08ce98e1ed github.com/gorilla/schema v1.4.1 github.com/joho/godotenv v1.5.1 github.com/lib/pq v1.10.3 diff --git a/go.sum b/go.sum index 8e893879..261fd0cd 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c h1:yrk9pbtLaPEWmmrx2v5U457PnEzIg1o+Q6X0hOZWWS0= -github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c/go.mod h1:O5T12WrCE1SOD52A9Ye//Wjl3HX7BFZv3dXzDz3adMo= +github.com/GeoNet/kit v0.0.0-20250803205759-df08ce98e1ed h1:BzEg8z1uSdNbv/Ivmf6NZAa1WjcbI+YzR9lxVlp8BZE= +github.com/GeoNet/kit v0.0.0-20250803205759-df08ce98e1ed/go.mod h1:XeIegOtPHnYCcsPZjTWMdmcUkUowOmIxVNhlwOlyjhw= github.com/aws/aws-sdk-go-v2 v1.25.3 h1:xYiLpZTQs1mzvz5PaI6uR0Wh57ippuEthxS4iK5v0n0= github.com/aws/aws-sdk-go-v2 v1.25.3/go.mod h1:35hUlJVYd+M++iLI3ALmVwMOyRYMmRqUXpTtRGW+K9I= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU= diff --git a/vendor/github.com/GeoNet/kit/aws/s3/s3.go b/vendor/github.com/GeoNet/kit/aws/s3/s3.go index 9a0cdabe..e87669fc 100644 --- a/vendor/github.com/GeoNet/kit/aws/s3/s3.go +++ b/vendor/github.com/GeoNet/kit/aws/s3/s3.go @@ -121,6 +121,11 @@ func (s *S3) Ready() bool { return s.client != nil } +// Client returns the underlying S3 client. +func (s *S3) Client() *s3.Client { + return s.client +} + // Get gets the object referred to by key and version from bucket and writes it into b. // Version can be empty. func (s *S3) Get(bucket, key, version string, b *bytes.Buffer) error { @@ -562,3 +567,28 @@ func (s *S3) Copy(bucket, key, source string) error { return err } + +// CreateBucket creates a bucket. +func (s *S3) CreateBucket(bucket string) error { + config := types.CreateBucketConfiguration{ + LocationConstraint: types.BucketLocationConstraint(s.client.Options().Region), + } + + input := s3.CreateBucketInput{ + Bucket: aws.String(bucket), + CreateBucketConfiguration: &config, + } + _, err := s.client.CreateBucket(context.TODO(), &input) + + return err +} + +// DeleteBucket deletes a bucket. +func (s *S3) DeleteBucket(bucket string) error { + input := s3.DeleteBucketInput{ + Bucket: aws.String(bucket), + } + _, err := s.client.DeleteBucket(context.TODO(), &input) + + return err +} diff --git a/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go b/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go index 58c7a4b7..cc10211b 100644 --- a/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go +++ b/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go @@ -92,6 +92,11 @@ func (s *SQS) Ready() bool { return s.client != nil } +// Client returns the underlying SQS client. +func (s *SQS) Client() *sqs.Client { + return s.client +} + // Receive receives a raw message or error from the queue. // After a successful receive the message will be in flight // until it is either deleted or the visibility timeout expires @@ -122,34 +127,40 @@ func (s *SQS) ReceiveWithContextAttributes(ctx context.Context, queueURL string, WaitTimeSeconds: 20, AttributeNames: attrs, } - return s.receiveMessage(ctx, &input) + msgs, err := s.receiveMessages(ctx, &input) + if err != nil { + return Raw{}, err + } + return msgs[0], err } -// receiveMessage is the common code used internally to receive an SQS message based +// receiveMessages is the common code used internally to receive an SQS messages based // on the provided input. -func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput) (Raw, error) { +func (s *SQS) receiveMessages(ctx context.Context, input *sqs.ReceiveMessageInput) ([]Raw, error) { r, err := s.client.ReceiveMessage(ctx, input) if err != nil { - return Raw{}, err + return []Raw{}, err } switch { case r == nil || len(r.Messages) == 0: // no message received - return Raw{}, ErrNoMessages + return []Raw{}, ErrNoMessages - case len(r.Messages) == 1: - raw := r.Messages[0] + case len(r.Messages) >= 1: - m := Raw{ - Body: aws.ToString(raw.Body), - ReceiptHandle: aws.ToString(raw.ReceiptHandle), - Attributes: raw.Attributes, + messages := make([]Raw, len(r.Messages)) + for i := range r.Messages { + messages[i] = Raw{ + Body: aws.ToString(r.Messages[i].Body), + ReceiptHandle: aws.ToString(r.Messages[i].ReceiptHandle), + Attributes: r.Messages[i].Attributes, + } } - return m, nil + return messages, nil default: - return Raw{}, fmt.Errorf("received unexpected messages: %d", len(r.Messages)) + return []Raw{}, fmt.Errorf("received unexpected number of messages: %d", len(r.Messages)) // Probably an impossible case } } @@ -164,7 +175,28 @@ func (s *SQS) ReceiveWithContext(ctx context.Context, queueURL string, visibilit VisibilityTimeout: visibilityTimeout, WaitTimeSeconds: 20, } - return s.receiveMessage(ctx, &input) + msgs, err := s.receiveMessages(ctx, &input) + if err != nil { + return Raw{}, err + } + return msgs[0], err +} + +// ReceiveBatch is similar to Receive, however it can return up to 10 messages. +func (s *SQS) ReceiveBatch(ctx context.Context, queueURL string, visibilityTimeout int32) ([]Raw, error) { + + input := sqs.ReceiveMessageInput{ + QueueUrl: aws.String(queueURL), + MaxNumberOfMessages: 10, + VisibilityTimeout: visibilityTimeout, + WaitTimeSeconds: 20, + } + + msgs, err := s.receiveMessages(ctx, &input) + if err != nil { + return []Raw{}, err + } + return msgs, nil } // Delete deletes the message referred to by receiptHandle from the queue. @@ -179,6 +211,19 @@ func (s *SQS) Delete(queueURL, receiptHandle string) error { return err } +// SetMessageVisibility sets the visibility timeout for a message. +func (s *SQS) SetMessageVisibility(queueURL, receiptHandle string, visibilityTimeout int32) error { + params := sqs.ChangeMessageVisibilityInput{ + QueueUrl: aws.String(queueURL), + ReceiptHandle: aws.String(receiptHandle), + VisibilityTimeout: visibilityTimeout, + } + + _, err := s.client.ChangeMessageVisibility(context.TODO(), ¶ms) + + return err +} + // Send sends the message body to the SQS queue referred to by queueURL. func (s *SQS) Send(queueURL string, body string) error { params := sqs.SendMessageInput{ @@ -226,44 +271,284 @@ func (s *SQS) SendFifoMessage(queue, group, dedupe string, msg []byte) (string, return "", nil } -// Leverage the sendbatch api for uploading large numbers of messages -func (s *SQS) SendBatch(ctx context.Context, queueURL string, bodies []string) error { - if len(bodies) > 11 { - return errors.New("too many messages to batch") +type SendBatchError struct { + Err error + Info []SendBatchErrorEntry +} +type SendBatchErrorEntry struct { + Entry types.BatchResultErrorEntry + Index int +} + +func (s *SendBatchError) Error() string { + return fmt.Sprintf("%v: %v messages failed to send", s.Err, len(s.Info)) +} +func (s *SendBatchError) Unwrap() error { + return s.Err +} + +type SendNBatchError struct { + Errors []error + Info []SendBatchErrorEntry +} + +func (s *SendNBatchError) Error() string { + var allErrors string + for _, err := range s.Errors { + allErrors += fmt.Sprintf("%s,", err.Error()) } + allErrors = strings.TrimSuffix(allErrors, ",") + return fmt.Sprintf("%v error(s) sending batches: %s", len(s.Errors), allErrors) +} + +// SendBatch sends up to 10 messages to a given SQS queue with one API call. +// If an error occurs on any or all messages, a SendBatchError is returned that lets +// the caller know the index of the message/s in bodies that failed. +func (s *SQS) SendBatch(ctx context.Context, queueURL string, bodies []string) error { + var err error entries := make([]types.SendMessageBatchRequestEntry, len(bodies)) for j, body := range bodies { entries[j] = types.SendMessageBatchRequestEntry{ - Id: aws.String(fmt.Sprintf("gamitjob%d", j)), + Id: aws.String(fmt.Sprintf("message-%d", j)), MessageBody: aws.String(body), } } - _, err = s.client.SendMessageBatch(ctx, &sqs.SendMessageBatchInput{ + output, err := s.client.SendMessageBatch(ctx, &sqs.SendMessageBatchInput{ Entries: entries, QueueUrl: &queueURL, }) - return err + if err != nil { + info := make([]SendBatchErrorEntry, len(entries)) + for i := range entries { + info[i] = SendBatchErrorEntry{ + Index: i, + } + } + return &SendBatchError{Err: err, Info: info} + } + if len(output.Failed) > 0 { + info := make([]SendBatchErrorEntry, len(output.Failed)) + for i, entry := range output.Failed { + for j, msg := range entries { + if aws.ToString(msg.Id) == aws.ToString(entry.Id) { + info[i] = SendBatchErrorEntry{ + Entry: entry, + Index: j, + } + break + } + } + } + return &SendBatchError{Err: errors.New("partial message failure"), Info: info} + } + return nil } -func (s *SQS) SendNBatch(ctx context.Context, queueURL string, bodies []string) error { +// SendNBatch sends any number of messages to a given SQS queue via a series of SendBatch calls. +// If an error occurs on any or all messages, a SendNBatchError is returned that lets +// the caller know the index of the message/s in bodies that failed. +// Returns the number of API calls to SendBatch made. +func (s *SQS) SendNBatch(ctx context.Context, queueURL string, bodies []string) (int, error) { + + const ( + maxCount = 10 + maxSize = 262144 // 256 KiB + ) + + allErrors := make([]error, 0) + allInfo := make([]SendBatchErrorEntry, 0) + + batchesSent := 0 + + batch := make([]int, 0) + totalSize := 0 + + sendBatch := func() { + batchBodies := make([]string, len(batch)) + + for i, batchIndex := range batch { + batchBodies[i] = bodies[batchIndex] + } + + err := s.SendBatch(ctx, queueURL, batchBodies) + var sbe *SendBatchError + if errors.As(err, &sbe) { + allErrors = append(allErrors, err) + + // Update index so that index refers to the position in given bodies slice. + for i := range sbe.Info { + sbe.Info[i].Index = batch[sbe.Info[i].Index] + } + + allInfo = append(allInfo, sbe.Info...) + } + + batchesSent++ + batch = batch[:0] + totalSize = 0 + } + + for i, body := range bodies { + + // Check if any single message is too big + if len(body) > maxSize { + allErrors = append(allErrors, errors.New("message too big to send")) + allInfo = append(allInfo, SendBatchErrorEntry{ + Index: i, + }) + continue + } + // If adding the current message would exceed the batch max size or count, send the current batch. + if totalSize+len(body) > maxSize || len(batch) == maxCount { + sendBatch() + } + batch = append(batch, i) + totalSize += len(body) + } + + if len(batch) > 0 { + sendBatch() + } + + if len(allErrors) > 0 { + return batchesSent, &SendNBatchError{ + Errors: allErrors, + Info: allInfo, + } + } + + return batchesSent, nil +} + +type DeleteBatchError struct { + Err error + Info []DeleteBatchErrorEntry +} + +type DeleteBatchErrorEntry struct { + Entry types.BatchResultErrorEntry + Index int +} + +func (d *DeleteBatchError) Error() string { + return fmt.Sprintf("%v: %v messages failed to delete", d.Err, len(d.Info)) +} + +func (d *DeleteBatchError) Unwrap() error { + return d.Err +} + +type DeleteNBatchError struct { + Errors []error + Info []DeleteBatchErrorEntry +} + +func (s *DeleteNBatchError) Error() string { + var allErrors string + for _, err := range s.Errors { + allErrors += fmt.Sprintf("%s,", err.Error()) + } + allErrors = strings.TrimSuffix(allErrors, ",") + return fmt.Sprintf("%v error(s) deleting batches: %s", len(s.Errors), allErrors) +} + +// DeleteBatch deletes up to 10 messages from an SQS queue in a single batch. +// If an error occurs on any or all messages, a DeleteBatchError is returned that lets +// the caller know the indice/s in receiptHandles that failed. +func (s *SQS) DeleteBatch(ctx context.Context, queueURL string, receiptHandles []string) error { + entries := make([]types.DeleteMessageBatchRequestEntry, len(receiptHandles)) + for i, receipt := range receiptHandles { + entries[i] = types.DeleteMessageBatchRequestEntry{ + Id: aws.String(fmt.Sprintf("delete-message-%d", i)), + ReceiptHandle: aws.String(receipt), + } + } + + output, err := s.client.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{ + Entries: entries, + QueueUrl: &queueURL, + }) + if err != nil { + info := make([]DeleteBatchErrorEntry, len(entries)) + for i := range entries { + info[i] = DeleteBatchErrorEntry{ + Index: i, + } + } + return &DeleteBatchError{Err: err, Info: info} + } + if len(output.Failed) > 0 { + info := make([]DeleteBatchErrorEntry, len(output.Failed)) + for i, errorEntry := range output.Failed { + for j, requestEntry := range entries { + if aws.ToString(requestEntry.Id) == aws.ToString(errorEntry.Id) { + info[i] = DeleteBatchErrorEntry{ + Entry: errorEntry, + Index: j, + } + break + } + } + } + return &DeleteBatchError{Info: info} + } + return nil +} + +// DeleteNBatch deletes any number of messages from a given SQS queue via a series of DeleteBatch calls. +// If an error occurs on any or all messages, a DeleteNBatchError is returned that lets +// the caller know the receipt handles that failed. +// Returns the number of API calls to DeleteBatch made. +func (s *SQS) DeleteNBatch(ctx context.Context, queueURL string, receiptHandles []string) (int, error) { + var ( - bodiesLen = len(bodies) - maxlen = 10 - times = int(math.Ceil(float64(bodiesLen) / float64(maxlen))) + receiptCount = len(receiptHandles) + maxlen = 10 + times = int(math.Ceil(float64(receiptCount) / float64(maxlen))) ) + + allErrors := make([]error, 0) + allInfo := make([]DeleteBatchErrorEntry, 0) + + batchesDeleted := 0 + for i := 0; i < times; i++ { batch_end := maxlen * (i + 1) - if maxlen*(i+1) > bodiesLen { - batch_end = bodiesLen + if maxlen*(i+1) > receiptCount { + batch_end = receiptCount + } + var receipt_batch = receiptHandles[maxlen*i : batch_end] + + indexMap := make(map[int]int, 0) + count := 0 + for j := maxlen * i; j < batch_end; j++ { + indexMap[count] = j + count++ } - var bodies_batch = bodies[maxlen*i : batch_end] - err := s.SendBatch(ctx, queueURL, bodies_batch) - if err != nil { - return err + + err := s.DeleteBatch(ctx, queueURL, receipt_batch) + var dbe *DeleteBatchError + if errors.As(err, &dbe) { + allErrors = append(allErrors, err) + + // Update index so that index refers to the position in given receiptHandles slice. + for i := range dbe.Info { + dbe.Info[i].Index = indexMap[dbe.Info[i].Index] + } + + allInfo = append(allInfo, dbe.Info...) } + batchesDeleted++ } - return nil + + if len(allErrors) > 0 { + return batchesDeleted, &DeleteNBatchError{ + Errors: allErrors, + Info: allInfo, + } + } + return batchesDeleted, nil } // GetQueueUrl returns an AWS SQS queue URL given its name. diff --git a/vendor/github.com/GeoNet/kit/metrics/counters.go b/vendor/github.com/GeoNet/kit/metrics/counters.go index 04087cfb..12d6bb52 100644 --- a/vendor/github.com/GeoNet/kit/metrics/counters.go +++ b/vendor/github.com/GeoNet/kit/metrics/counters.go @@ -10,9 +10,9 @@ var msgCounters [4]uint64 var msgLast [4]uint64 var msgCurrent [4]uint64 -var httpCounters [8]uint64 -var httpLast [8]uint64 -var httpCurrent [8]uint64 +var httpCounters [9]uint64 +var httpLast [9]uint64 +var httpCurrent [9]uint64 // A MsgCounters records message counters. type MsgCounters struct { @@ -55,6 +55,9 @@ type HttpCounters struct { // StatusServiceUnavailable is the count of http 503 responses. StatusServiceUnavailable uint64 + // StatusTooManyRequests is the count of http 429 responses. + StatusTooManyRequests uint64 + // Written is the number of bytes written. Written uint64 @@ -97,7 +100,8 @@ func ReadHttpCounters(m *HttpCounters) { m.StatusNotFound = httpCurrent[4] - httpLast[4] m.StatusInternalServerError = httpCurrent[5] - httpLast[5] m.StatusServiceUnavailable = httpCurrent[6] - httpLast[6] - m.Written = httpCurrent[7] - httpLast[7] + m.StatusTooManyRequests = httpCurrent[7] - httpLast[7] + m.Written = httpCurrent[8] - httpLast[8] for i := range httpCounters { httpLast[i] = httpCurrent[i] @@ -159,7 +163,12 @@ func StatusServiceUnavailable() { atomic.AddUint64(&httpCounters[6], 1) } +// StatusTooManyRequests increments the http response 429 counter. It is safe for concurrent access. +func StatusTooManyRequests() { + atomic.AddUint64(&httpCounters[7], 1) +} + // Written increments the bytes sent counter by n. func Written(n int64) { - atomic.AddUint64(&httpCounters[7], uint64(n)) + atomic.AddUint64(&httpCounters[8], uint64(n)) //nolint:gosec } diff --git a/vendor/github.com/GeoNet/kit/metrics/ddog_http.go b/vendor/github.com/GeoNet/kit/metrics/ddog_http.go index 7b7b0d49..f4694ab8 100644 --- a/vendor/github.com/GeoNet/kit/metrics/ddog_http.go +++ b/vendor/github.com/GeoNet/kit/metrics/ddog_http.go @@ -112,6 +112,12 @@ func dogHttp(apiKey, hostName, appName string, m runtime.MemStats, t []TimerStat Type: "counter", Host: hostName, }, + { + Metric: appName + ".http.429", + Points: []point{[2]float64{now, float64(c.StatusTooManyRequests)}}, + Type: "counter", + Host: hostName, + }, { Metric: appName + ".http.500", Points: []point{[2]float64{now, float64(c.StatusInternalServerError)}}, @@ -177,7 +183,7 @@ func dogHttp(apiKey, hostName, appName string, m runtime.MemStats, t []TimerStat } } // non nil connection error, sleep and try again - time.Sleep(time.Second << uint(tries)) + time.Sleep(time.Second << uint(tries)) //nolint:gosec } if res != nil { res.Body.Close() diff --git a/vendor/github.com/GeoNet/kit/metrics/ddog_msg.go b/vendor/github.com/GeoNet/kit/metrics/ddog_msg.go index 7312d238..c102d1da 100644 --- a/vendor/github.com/GeoNet/kit/metrics/ddog_msg.go +++ b/vendor/github.com/GeoNet/kit/metrics/ddog_msg.go @@ -216,7 +216,7 @@ func dogMsg(apiKey, hostName, appName string, m runtime.MemStats, t []TimerStats } } // non nil connection error, sleep and try again - time.Sleep(time.Second << uint(tries)) + time.Sleep(time.Second << uint(tries)) //nolint:gosec } if res != nil { res.Body.Close() diff --git a/vendor/github.com/GeoNet/kit/seis/ms/blockette.go b/vendor/github.com/GeoNet/kit/seis/ms/blockette.go index 4c3d4572..e70d3eb9 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/blockette.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/blockette.go @@ -126,7 +126,7 @@ func EncodeBlockette1001(blk Blockette1001) []byte { var b [Blockette1001Size]byte b[0] = blk.TimingQuality - b[1] = uint8(blk.MicroSec) + b[1] = uint8(blk.MicroSec) //nolint:gosec b[2] = blk.Reserved b[3] = blk.FrameCount diff --git a/vendor/github.com/GeoNet/kit/seis/ms/btime.go b/vendor/github.com/GeoNet/kit/seis/ms/btime.go index a434c72e..6c6f1523 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/btime.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/btime.go @@ -36,12 +36,12 @@ func (b BTime) Time() time.Time { // NewBTime builds a BTime from a time.Time. func NewBTime(t time.Time) BTime { return BTime{ - Year: uint16(t.Year()), - Doy: uint16(t.YearDay()), - Hour: uint8(t.Hour()), - Minute: uint8(t.Minute()), - Second: uint8(t.Second()), - S0001: uint16(t.Nanosecond() / 100000), + Year: uint16(t.Year()), //nolint:gosec + Doy: uint16(t.YearDay()), //nolint:gosec + Hour: uint8(t.Hour()), //nolint:gosec + Minute: uint8(t.Minute()), //nolint:gosec + Second: uint8(t.Second()), //nolint:gosec + S0001: uint16(t.Nanosecond() / 100000), //nolint:gosec } } diff --git a/vendor/github.com/GeoNet/kit/seis/ms/decode.go b/vendor/github.com/GeoNet/kit/seis/ms/decode.go index c65e65de..8dce02f5 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/decode.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/decode.go @@ -7,6 +7,7 @@ import ( ) func decodeInt32(data []byte, order uint8, samples uint16) ([]int32, error) { + //nolint:gosec if n := uint16(len(data) / 4); n < samples { return nil, fmt.Errorf("invalid data length: %d", n) } @@ -20,13 +21,14 @@ func decodeInt32(data []byte, order uint8, samples uint16) ([]int32, error) { default: b = binary.BigEndian.Uint32(data[i:]) } - values = append(values, int32(b)) + values = append(values, int32(b)) //nolint:gosec } return values, nil } func decodeFloat32(data []byte, order uint8, samples uint16) ([]float32, error) { + //nolint:gosec if n := uint16(len(data) / 4); n < samples { return nil, fmt.Errorf("invalid data length: %d", n) } @@ -45,6 +47,7 @@ func decodeFloat32(data []byte, order uint8, samples uint16) ([]float32, error) } func decodeFloat64(data []byte, order uint8, samples uint16) ([]float64, error) { + //nolint:gosec if n := uint16(len(data) / 8); n < samples { return nil, fmt.Errorf("invalid data length: %d", n) } diff --git a/vendor/github.com/GeoNet/kit/seis/ms/header.go b/vendor/github.com/GeoNet/kit/seis/ms/header.go index 55c3b193..13c26ee8 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/header.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/header.go @@ -107,7 +107,7 @@ func (h *RecordHeader) SetCorrection(correction time.Duration, applied bool) { default: h.ActivityFlags = clearBit(h.ActivityFlags, 1) } - h.TimeCorrection = int32(correction / (100 * time.Microsecond)) + h.TimeCorrection = int32(correction / (100 * time.Microsecond)) //nolint:gosec } func (h RecordHeader) Correction() time.Duration { @@ -260,18 +260,17 @@ func DecodeRecordHeader(data []byte) RecordHeader { RecordStartTime: DecodeBTime(h[20:30]), NumberOfSamples: binary.BigEndian.Uint16(h[30:32]), - SampleRateFactor: int16(binary.BigEndian.Uint16(h[32:34])), - SampleRateMultiplier: int16(binary.BigEndian.Uint16(h[34:36])), + SampleRateFactor: int16(binary.BigEndian.Uint16(h[32:34])), //nolint:gosec + SampleRateMultiplier: int16(binary.BigEndian.Uint16(h[34:36])), //nolint:gosec ActivityFlags: h[36], IOAndClockFlags: h[37], DataQualityFlags: h[38], NumberOfBlockettesThatFollow: h[39], - - TimeCorrection: int32(binary.BigEndian.Uint32(h[40:44])), - BeginningOfData: binary.BigEndian.Uint16(h[44:46]), - FirstBlockette: binary.BigEndian.Uint16(h[46:48]), + TimeCorrection: int32(binary.BigEndian.Uint32(h[40:44])), //nolint:gosec + BeginningOfData: binary.BigEndian.Uint16(h[44:46]), + FirstBlockette: binary.BigEndian.Uint16(h[46:48]), } } @@ -290,15 +289,15 @@ func EncodeRecordHeader(hdr RecordHeader) []byte { copy(b[20:30], EncodeBTime(hdr.RecordStartTime)) binary.BigEndian.PutUint16(b[30:32], hdr.NumberOfSamples) - binary.BigEndian.PutUint16(b[32:34], uint16(hdr.SampleRateFactor)) - binary.BigEndian.PutUint16(b[34:36], uint16(hdr.SampleRateMultiplier)) + binary.BigEndian.PutUint16(b[32:34], uint16(hdr.SampleRateFactor)) //nolint:gosec + binary.BigEndian.PutUint16(b[34:36], uint16(hdr.SampleRateMultiplier)) //nolint:gosec b[36] = hdr.ActivityFlags b[37] = hdr.IOAndClockFlags b[38] = hdr.DataQualityFlags b[39] = hdr.NumberOfBlockettesThatFollow - binary.BigEndian.PutUint32(b[40:44], uint32(hdr.TimeCorrection)) + binary.BigEndian.PutUint32(b[40:44], uint32(hdr.TimeCorrection)) //nolint:gosec binary.BigEndian.PutUint16(b[44:46], hdr.BeginningOfData) binary.BigEndian.PutUint16(b[46:48], hdr.FirstBlockette) diff --git a/vendor/github.com/GeoNet/kit/seis/ms/steim.go b/vendor/github.com/GeoNet/kit/seis/ms/steim.go index 6abafc11..107e83ea 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/steim.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/steim.go @@ -6,11 +6,13 @@ import ( ) func getNibble(word []byte, index int) uint8 { - b := word[index/4] //Which byte we want from within the word (4 bytes per word) - var res uint8 //value - i := index % 4 //which nibble we want from within the byte (4 nibbles per byte) + b := word[index/4] //Which byte we want from within the word (4 bytes per word) + var res uint8 //value + i := index % 4 //which nibble we want from within the byte (4 nibbles per byte) + //nolint:gosec res = b & (0x3 << uint8((3-i)*2)) //0x3=00000011 create and apply the correct mask e.g. i=1 mask=00110000 - res = res >> uint8((3-i)*2) //shift the masked value fully to the right + //nolint:gosec + res = res >> uint8((3-i)*2) //shift the masked value fully to the right return res } @@ -18,6 +20,7 @@ func getNibble(word []byte, index int) uint8 { func writeNibble(word []byte, index int, value uint8) { b := word[index/4] i := index % 4 + //nolint:gosec b = b ^ (value << uint8((3-i)*2)) //set the bits word[index/4] = b } @@ -32,7 +35,7 @@ func uintVarToInt32(v uint32, numbits uint8) int32 { v = v + 1 //add 1 - positive nbit number v = -v //get the negative - this gives us a proper negative int32 } - return int32(v) + return int32(v) //nolint:gosec } func int32ToUintVar(i int32, numbits uint8) uint32 { @@ -42,7 +45,7 @@ func int32ToUintVar(i int32, numbits uint8) uint32 { i = i - 1 i = i ^ ((1 << (numbits)) - 1) //flip all the bits } - return uint32(i) + return uint32(i) //nolint:gosec } func applyDifferencesFromWord(w []byte, numdiffs int, diffbits uint32, d []int32) []int32 { @@ -50,9 +53,11 @@ func applyDifferencesFromWord(w []byte, numdiffs int, diffbits uint32, d []int32 wint := binary.BigEndian.Uint32(w) for i := numdiffs - 1; i >= 0; i-- { + //nolint:gosec intn := wint & uint32(mask<<(uint32(i)*diffbits)) //apply a mask over the correct bits - intn = intn >> (uint32(i) * diffbits) //shift the masked value fully to the right - + //nolint:gosec + intn = intn >> (uint32(i) * diffbits) //shift the masked value fully to the right + //nolint:gosec diff := uintVarToInt32(intn, uint8(diffbits)) //convert diffbits bit int to int32 d = append(d, d[len(d)-1]+diff) @@ -70,8 +75,8 @@ func decodeSteim(version int, raw []byte, wordOrder, frameCount uint8, expectedS //Word 1 and 2 contain x0 and xn: the uncompressed initial and final quantities (word 0 contains nibs) frame0 := raw[0:64] - start := int32(binary.BigEndian.Uint32(frame0[4:8])) - end := int32(binary.BigEndian.Uint32(frame0[8:12])) + start := int32(binary.BigEndian.Uint32(frame0[4:8])) //nolint:gosec + end := int32(binary.BigEndian.Uint32(frame0[8:12])) //nolint:gosec d = append(d, start) diff --git a/vendor/github.com/GeoNet/kit/seis/ms/unpack.go b/vendor/github.com/GeoNet/kit/seis/ms/unpack.go index 10fc28de..57519dfb 100644 --- a/vendor/github.com/GeoNet/kit/seis/ms/unpack.go +++ b/vendor/github.com/GeoNet/kit/seis/ms/unpack.go @@ -85,7 +85,7 @@ func (m Record) Int32s() ([]int32, error) { case EncodingInt32: return decodeInt32(m.Data, m.B1000.WordOrder, m.RecordHeader.NumberOfSamples) case EncodingSTEIM1: - framecount := uint8(len(m.Data) / 64) + framecount := uint8(len(m.Data) / 64) //nolint:gosec if m.B1001.FrameCount != 0 { framecount = m.B1001.FrameCount } @@ -94,7 +94,7 @@ func (m Record) Int32s() ([]int32, error) { } return decodeSteim(1, m.Data, m.B1000.WordOrder, framecount, m.RecordHeader.NumberOfSamples) case EncodingSTEIM2: //STEIM2 - framecount := uint8(len(m.Data) / 64) + framecount := uint8(len(m.Data) / 64) //nolint:gosec if m.B1001.FrameCount != 0 { framecount = m.B1001.FrameCount } diff --git a/vendor/github.com/GeoNet/kit/weft/handlers.go b/vendor/github.com/GeoNet/kit/weft/handlers.go index 36dd0dfb..7eb3aec1 100644 --- a/vendor/github.com/GeoNet/kit/weft/handlers.go +++ b/vendor/github.com/GeoNet/kit/weft/handlers.go @@ -304,6 +304,9 @@ func writeResponseAndLogMetrics(err error, w http.ResponseWriter, r *http.Reques case http.StatusServiceUnavailable: metrics.StatusServiceUnavailable() logger.Printf("%d %s %s %s %s", status, r.Method, r.RequestURI, name, err.Error()) + case http.StatusTooManyRequests: + metrics.StatusTooManyRequests() + logger.Printf("%d %s %s %s %s", status, r.Method, r.RequestURI, name, err.Error()) } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 56c6d9ea..aa650931 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,5 +1,5 @@ -# github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c -## explicit; go 1.21 +# github.com/GeoNet/kit v0.0.0-20250803205759-df08ce98e1ed +## explicit; go 1.23 github.com/GeoNet/kit/aws/s3 github.com/GeoNet/kit/aws/sqs github.com/GeoNet/kit/cfg